Sat, 14 Jul 2018 17:21:25 +0200
Versie 0.6.3. MQTT device berichten alleen als een fermenter ingeschakeld is. MQTT fermenter birth en death berichhten als een fementer in of uitgeschakeld wordt. MQTT node death bericht bij normaal afsluiten van de daemon. Alle MQTT persistent berichten worden nu goed opgeruikmd.
/***************************************************************************** * Copyright (C) 2016-2018 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the mbsePi-apps * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * mbsePi-apps is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. *****************************************************************************/ #include "thermferm.h" #include "xutil.h" #include "mqtt.h" extern sys_config Config; extern int debug; extern const char UNITMODE[5][8]; extern const char PROFSTATE[5][6]; extern const char TEMPSTATE[3][8]; int Sequence = 0; #ifdef HAVE_MOSQUITTO_H /* Global variables for use in callbacks. */ int mqtt_qos = 0; int mqtt_status = STATUS_CONNECTING; int mqtt_mid_sent = 0; int mqtt_last_mid = -1; int mqtt_last_mid_sent = -1; int mqtt_connected = TRUE; int mqtt_disconnect_sent = FALSE; int mqtt_connect_lost = FALSE; int mqtt_my_shutdown = FALSE; int mqtt_use = FALSE; int keepalive = 60; unsigned int max_inflight = 20; struct mosquitto *mosq = NULL; char *state = NULL; char my_hostname[256]; char *payload_header(void) { char *tmp, buf[128]; tmp = xstrcpy((char *)"{\"timestamp\":"); sprintf(buf, "%ld", time(NULL)); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"seq\":"); sprintf(buf, "%d", Sequence++); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"metric\":"); return tmp; } char *topic_base(char *msgtype) { char *tmp; tmp = xstrcpy((char *)"mbv1.0/fermenters/"); tmp = xstrcat(tmp, msgtype); tmp = xstrcat(tmp, (char *)"/"); tmp = xstrcat(tmp, my_hostname); return tmp; } void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) { char *topic = NULL; if (mqtt_connect_lost) { mqtt_connect_lost = FALSE; syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); } if (!result) { topic = topic_base((char *)"NCMD"); topic = xstrcat(topic, (char *)"/#"); mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = topic_base((char *)"DCMD"); topic = xstrcat(topic, (char *)"/#"); mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = NULL; mqtt_status = STATUS_CONNACK_RECVD; } else { syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); } } void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) { if (mqtt_my_shutdown) { syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); mqtt_connected = FALSE; } else { /* * The remote server was brought down. We must keep running */ syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); mqtt_connect_lost = TRUE; } } void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) { mqtt_last_mid_sent = mid; } void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); for (i = 1; i < qos_count; i++) { syslog(LOG_NOTICE, " %d", granted_qos[i]); } } void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) { syslog(LOG_NOTICE, "MQTT: %s", str); if (debug) fprintf(stdout, "MQTT: %s\n", str); } void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) { if (message->payloadlen) { syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); // TODO: process subscribed topics here. } else { syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); } } void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) { // publish the data if (payload) mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); else mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); } void pub_domoticz_temp(int idx, int value) { char *dload = NULL; char sidx[10], sval[20]; if (idx == 0) return; sprintf(sidx, "%d", idx); sprintf(sval, "%.3f", value / 1000.0); dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":"); dload = xstrcat(dload, sidx); dload = xstrcat(dload, (char *)",\"nvalue\":0,\"svalue\":\""); dload = xstrcat(dload, sval); dload = xstrcat(dload, (char *)"\"}"); publisher(mosq, (char *)"domoticz/in", dload, false); free(dload); dload = NULL; } void pub_domoticz_output(int idx, int value) { char *dload = NULL; char sidx[10], sval[10]; if (idx == 0) return; sprintf(sidx, "%d", idx); sprintf(sval, "%d", value); dload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":"); dload = xstrcat(dload, sidx); dload = xstrcat(dload, (char *)",\"nvalue\":"); if (value >= 50) dload = xstrcat(dload, (char *)"1"); else dload = xstrcat(dload, (char *)"0"); dload = xstrcat(dload, (char *)",\"svalue\":\""); dload = xstrcat(dload, sval); dload = xstrcat(dload, (char *)"\"}"); publisher(mosq, (char *)"domoticz/in", dload, false); free(dload); dload = NULL; } char *unit_data(units_list *unit, bool birth) { char *payload = NULL; char buf[128]; bool comma = false; profiles_list *profile; prof_step *pstep; payload = xstrcat(payload, (char *)"{"); if (birth) { payload = xstrcat(payload, (char *)"\"uuid\":\""); payload = xstrcat(payload, unit->uuid); payload = xstrcat(payload, (char *)"\",\"alias\":\""); payload = xstrcat(payload, unit->alias); payload = xstrcat(payload, (char *)"\","); } payload = xstrcat(payload, (char *)"\"name\":\""); payload = xstrcat(payload, unit->name); if (unit->air_address) { payload = xstrcat(payload, (char *)"\",\"air\":{\"address\":\""); payload = xstrcat(payload, unit->air_address); payload = xstrcat(payload, (char *)"\",\"state\":\""); payload = xstrcat(payload, (char *)TEMPSTATE[unit->air_state]); payload = xstrcat(payload, (char *)"\",\"temperature\":"); sprintf(buf, "%.3f", unit->air_temperature / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)"\",\"air\":null"); } if (unit->beer_address) { payload = xstrcat(payload, (char *)",\"beer\":{\"address\":\""); payload = xstrcat(payload, unit->beer_address); payload = xstrcat(payload, (char *)"\",\"state\":\""); payload = xstrcat(payload, (char *)TEMPSTATE[unit->beer_state]); payload = xstrcat(payload, (char *)"\",\"temperature\":"); sprintf(buf, "%.3f", unit->beer_temperature / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"beer\":null"); } if (unit->chiller_address) { payload = xstrcat(payload, (char *)",\"chiller\":{\"address\":\""); payload = xstrcat(payload, unit->chiller_address); payload = xstrcat(payload, (char *)"\",\"state\":\""); payload = xstrcat(payload, (char *)TEMPSTATE[unit->chiller_state]); payload = xstrcat(payload, (char *)"\",\"temperature\":"); sprintf(buf, "%.3f", unit->chiller_temperature / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"chiller\":null"); } if (unit->heater_address) { payload = xstrcat(payload, (char *)",\"heater\":{\"address\":\""); payload = xstrcat(payload, unit->heater_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->heater_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"heater\":null"); } if (unit->cooler_address) { payload = xstrcat(payload, (char *)",\"cooler\":{\"address\":\""); payload = xstrcat(payload, unit->cooler_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->cooler_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"cooler\":null"); } if (unit->fan_address) { payload = xstrcat(payload, (char *)",\"fan\":{\"address\":\""); payload = xstrcat(payload, unit->fan_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->fan_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"fan\":null"); } if (unit->door_address) { payload = xstrcat(payload, (char *)",\"door\":{\"address\":\""); payload = xstrcat(payload, unit->door_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->door_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"door\":null"); } if (unit->light_address) { payload = xstrcat(payload, (char *)",\"light\":{\"address\":\""); payload = xstrcat(payload, unit->light_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->light_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"light\":null"); } if (unit->psu_address) { payload = xstrcat(payload, (char *)",\"psu\":{\"address\":\""); payload = xstrcat(payload, unit->psu_address); payload = xstrcat(payload, (char *)"\",\"state\":"); sprintf(buf, "%d", unit->psu_state); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); } else { payload = xstrcat(payload, (char *)",\"psu\":null"); } payload = xstrcat(payload, (char *)",\"mode\":\""); payload = xstrcat(payload, (char *)UNITMODE[unit->mode]); payload = xstrcat(payload, (char *)"\",\"setpoint\":{\"low\":"); sprintf(buf, "%.1f", unit->PID_heat->SetP); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"high\":"); sprintf(buf, "%.1f", unit->PID_cool->SetP); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); if (unit->mode == UNITMODE_PROFILE && unit->profile) { for (profile = Config.profiles; profile; profile = profile->next) { if (strcmp(unit->profile, profile->uuid) == 0) { payload = xstrcat(payload, (char *)",\"profile\":{\"uuid\":\""); payload = xstrcat(payload, unit->profile); payload = xstrcat(payload, (char *)",\"name\":\""); payload = xstrcat(payload, profile->name); payload = xstrcat(payload, (char *)"\",\"inittemp\":{\"low\":"); sprintf(buf, "%.1f", profile->inittemp_lo); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"high\":"); sprintf(buf, "%.1f", profile->inittemp_hi); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"fridgemode\":"); sprintf(buf, "%d", profile->fridge_mode); payload = xstrcat(payload, buf); comma = false; if (profile->steps) { payload = xstrcat(payload, (char *)",\"steps\":["); for (pstep = profile->steps; pstep; pstep = pstep->next) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"{\"resttime\":"); sprintf(buf, "%d", pstep->resttime); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"steptime\":"); sprintf(buf, "%d", pstep->steptime); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"target\":{\"low\":"); sprintf(buf, "%.1f", pstep->target_lo); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"high\":"); sprintf(buf, "%.1f", pstep->target_hi); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"fridgemode\":"); sprintf(buf, "%d", pstep->fridge_mode); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); comma = true; } payload = xstrcat(payload, (char *)"]"); } else { payload = xstrcat(payload, (char *)",\"steps\":null"); } payload = xstrcat(payload, (char *)"}"); break; } } } else { payload = xstrcat(payload, (char *)",\"profile\":null"); } payload = xstrcat(payload, (char *)"}"); return payload; } /** * @brief Publish DBIRTH for all active units. If there are no active units, don't * publish anything. This function should be called at program start. */ void publishDBirthAll(void) { char *payload = NULL; units_list *unit; int comma = FALSE; payload = payload_header(); payload = xstrcat(payload, (char *)"{\"units\":["); for (unit = Config.units; unit; unit = unit->next) { if (unit->mode != UNITMODE_OFF) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, unit_data(unit, true)); comma = TRUE; } } if (comma) { // Only publish if there is at least one unit active. payload = xstrcat(payload, (char *)"]}}"); publisher(mosq, topic_base((char *)"DBIRTH"), payload, true); } free(payload); payload = NULL; } #endif void publishDData(units_list *unit) { #ifdef HAVE_MOSQUITTO_H char *payload = NULL, *topic = NULL; if (mqtt_use) { payload = payload_header(); payload = xstrcat(payload, unit_data(unit, false)); payload = xstrcat(payload, (char *)"}"); topic = xstrcat(topic_base((char *)"DDATA"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, payload, false); free(payload); payload = NULL; free(topic); topic = NULL; } #endif } void publishDBirth(units_list *unit) { #ifdef HAVE_MOSQUITTO_H char *payload = NULL, *topic = NULL; if (mqtt_use) { payload = payload_header(); payload = xstrcat(payload, unit_data(unit, true)); payload = xstrcat(payload, (char *)"}"); topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, payload, true); free(payload); payload = NULL; free(topic); topic = NULL; } #endif } void publishDDeath(units_list *unit) { #ifdef HAVE_MOSQUITTO_H char *topic = NULL; if (mqtt_use) { // First delete presistent DBIRTH topic topic = xstrcat(topic_base((char *)"DBIRTH"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, NULL, true); free(topic); topic = NULL; topic = xstrcat(topic_base((char *)"DDEATH"), (char *)"/"); topic = xstrcat(topic, unit->alias); publisher(mosq, topic, NULL, true); free(topic); topic = NULL; } #endif } void publishNData(bool birth, int flag) { #ifdef HAVE_MOSQUITTO_H char *payload = NULL, sidx[10], buf[64]; struct utsname ubuf; bool comma = false; payload = payload_header(); payload = xstrcat(payload, (char *)"{"); if (birth || flag & MQTT_NODE_CONTROL) { payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); comma = true; } if (birth) { if (comma) payload = xstrcat(payload, (char *)","); #ifdef HAVE_WIRINGPI_H payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Raspberry\",\"hardwaremodel\":\"Unknown\""); #else payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); #endif if (uname(&ubuf) == 0) { payload = xstrcat(payload, (char *)",\"os\":\""); payload = xstrcat(payload, ubuf.sysname); payload = xstrcat(payload, (char *)"\",\"os_version\":\""); payload = xstrcat(payload, ubuf.release); payload = xstrcat(payload, (char *)"\""); } else { payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); } payload = xstrcat(payload, (char *)",\"FW\":\""); payload = xstrcat(payload, (char *)VERSION); payload = xstrcat(payload, (char *)"\"}"); comma = true; } if (Config.temp_address || Config.hum_address) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"HT\":{"); if (Config.temp_address) { payload = xstrcat(payload, (char *)"\"temperature\":"); sprintf(buf, "%.1f", Config.temp_value / 1000.0); payload = xstrcat(payload, buf); } if (Config.temp_address && Config.hum_address) payload = xstrcat(payload, (char *)","); if (Config.hum_address) { payload = xstrcat(payload, (char *)"\"humidity\":"); sprintf(buf, "%.1f", Config.hum_value / 1000.0); payload = xstrcat(payload, buf); } payload = xstrcat(payload, (char *)"}"); } payload = xstrcat(payload, (char *)"}}"); if (birth) publisher(mosq, topic_base((char *)"NBIRTH"), payload, true); else publisher(mosq, topic_base((char *)"NDATA"), payload, false); free(payload); payload = NULL; if ((Config.temp_address || Config.hum_address) && Config.temp_hum_idx) { sprintf(sidx, "%d", Config.temp_hum_idx); sprintf(buf, "%.1f;%.1f;0", Config.temp_value / 1000.0, Config.hum_value / 1000.0); payload = xstrcpy((char *)"{\"command\":\"udevice\",\"idx\":"); payload = xstrcat(payload, sidx); payload = xstrcat(payload, (char *)",\"nvalue\":0,\"svalue\":\""); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"\"}"); publisher(mosq, (char *)"domoticz/in", payload, false); free(payload); payload = NULL; } #endif } void mqtt_connect(void) { #ifdef HAVE_MOSQUITTO_H char *id = NULL; char err[1024]; int rc; /* * Initialize mosquitto communication */ gethostname(my_hostname, 255); mosquitto_lib_init(); id = xstrcpy((char *)"thermferm/"); id = xstrcat(id, my_hostname); if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { /* * Enforce maximum client id length of 23 characters */ id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; } mosq = mosquitto_new(id, TRUE, NULL); if (!mosq) { switch(errno) { case ENOMEM: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); break; case EINVAL: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); break; } mosquitto_lib_cleanup(); return; } /* * Set our will */ if ((rc = mosquitto_will_set(mosq, topic_base((char *)"NDEATH"), 0, NULL, mqtt_qos, false))) { if (rc > MOSQ_ERR_SUCCESS) syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); mosquitto_lib_cleanup(); return; } if (debug) mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_max_inflight_messages_set(mosq, max_inflight); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { if (rc == MOSQ_ERR_ERRNO) { strerror_r(errno, err, 1024); syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); } else { syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); } mosquitto_lib_cleanup(); syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); } else { mqtt_use = TRUE; syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); /* * Initialise is complete, report our presence state */ mosquitto_loop_start(mosq); publishNData(true, 0); publishDBirthAll(); } #endif } void mqtt_disconnect(void) { #ifdef HAVE_MOSQUITTO_H int rc; if (mqtt_use) { /* * Final publish 0 to clients/<hostname>/thermferm/state * After that, remove the retained topic. */ syslog(LOG_NOTICE, "MQTT disconnecting"); publisher(mosq, topic_base((char *)"NBIRTH"), NULL, true); publisher(mosq, topic_base((char *)"NDEATH"), NULL, true); mqtt_last_mid = mqtt_mid_sent; mqtt_status = STATUS_WAITING; mqtt_my_shutdown = TRUE; do { if (mqtt_status == STATUS_WAITING) { if (debug) fprintf(stdout, (char *)"Waiting\n"); if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) { mosquitto_disconnect(mosq); mqtt_disconnect_sent = TRUE; } usleep(100000); } rc = MOSQ_ERR_SUCCESS; } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); mosquitto_loop_stop(mosq, FALSE); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mqtt_use = FALSE; syslog(LOG_NOTICE, "MQTT disconnected"); } #endif }