Wed, 18 Mar 2020 19:46:15 +0100
Limit debug ProtoDeficit error messages. If the need for a yeast starter changes, make sure the screen is updated. Another set of extra debug logs. Fixed try yeast starter. Several dropdown lists changed the action from change to select. This should fix te too early calculations while the data was not yet loaded.
/***************************************************************************** * Copyright (C) 2017-2020 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the bms (Brewery Management System) * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * bms is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. *****************************************************************************/ #include "bms.h" #include "xutil.h" #include "mqtt.h" #include "nodes.h" #include "fermenters.h" #include "co2meters.h" #include "ispindels.h" extern sys_config Config; extern int debug; /* Global variables for use in callbacks. */ int mqtt_qos = 0; int mqtt_status = STATUS_CONNECTING; int mqtt_mid_sent = 0; int mqtt_last_mid = -1; int mqtt_last_mid_sent = -1; int mqtt_connected = TRUE; int mqtt_disconnect_sent = FALSE; int mqtt_connect_lost = FALSE; int mqtt_my_shutdown = FALSE; int mqtt_use = FALSE; int keepalive = 60; unsigned int max_inflight = 20; struct mosquitto *mosq = NULL; char *state = NULL; char my_hostname[256]; int Sequence = 0; char *payload_header(void) { static char *tmp; char buf[128]; tmp = xstrcpy((char *)"{\"timestamp\":"); sprintf(buf, "%ld", time(NULL)); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"seq\":"); sprintf(buf, "%d", Sequence++); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"metric\":"); return tmp; } char *topic_base(char *msgtype) { static char *tmp; tmp = xstrcpy((char *)"mbv1.0/brewery/"); tmp = xstrcat(tmp, msgtype); tmp = xstrcat(tmp, (char *)"/"); tmp = xstrcat(tmp, my_hostname); return tmp; } void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) { char *topic = NULL; if (mqtt_connect_lost) { mqtt_connect_lost = FALSE; syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); } if (!result) { topic = topic_base((char *)"NCMD"); // TODO: do we need this?? topic = xstrcat(topic, (char *)"/#"); mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = xstrcpy((char *)"mbv1.0/fermenters/#"); // Subscribe to fermenter messages. mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = xstrcpy((char *)"mbv1.0/co2meters/#"); // Subscribe to co2meter messages. mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = xstrcpy((char *)"mbv1.0/ispindels/#"); // Subscribe to ispindel messages. mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = NULL; mqtt_status = STATUS_CONNACK_RECVD; } else { syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); } } void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) { if (mqtt_my_shutdown) { syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); mqtt_connected = FALSE; } else { /* * The remote server was brought down. We must keep running */ syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); mqtt_connect_lost = TRUE; } } void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) { mqtt_last_mid_sent = mid; } void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); for (i = 1; i < qos_count; i++) { syslog(LOG_NOTICE, " %d", granted_qos[i]); } } void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) { syslog(LOG_NOTICE, "MQTT: %s", str); if (debug) fprintf(stdout, "MQTT: %s\n", str); } void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) { if (message->payloadlen) { // TODO: process subscribed topics here. if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) { node_birth_data(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"fermenters") && (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) { fermenter_birth_data(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) { fermenter_log(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DCMD")) { return; // just ignore our own commands. } if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DBIRTH")) { co2meter_birth_data(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DLOG")) { co2meter_log(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"ispindels") && strstr(message->topic, (char *)"DBIRTH")) { ispindel_birth_data(message->topic, (char *)message->payload); return; } syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); } else { if (strstr(message->topic, (char *)"NBIRTH")) { // Ignore ?? fprintf(stdout, "MQTT: %s NULL\n", message->topic); return; } if (strstr(message->topic, (char *)"NDEATH")) { node_death(message->topic); return; } if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) { fermenter_death(message->topic); return; } syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); } } void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) { // publish the data if (payload) mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); else mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); } void publishNData(bool birth, int flag) { char *topic = NULL, *payload = NULL; struct utsname ubuf; bool comma = false; payload = payload_header(); payload = xstrcat(payload, (char *)"{"); if (birth || flag & MQTT_NODE_CONTROL) { payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); comma = true; } if (birth) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); if (uname(&ubuf) == 0) { payload = xstrcat(payload, (char *)",\"os\":\""); payload = xstrcat(payload, ubuf.sysname); payload = xstrcat(payload, (char *)"\",\"os_version\":\""); payload = xstrcat(payload, ubuf.release); payload = xstrcat(payload, (char *)"\""); } else { payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); } payload = xstrcat(payload, (char *)",\"FW\":\""); payload = xstrcat(payload, (char *)VERSION); payload = xstrcat(payload, (char *)"\"}"); comma = true; } payload = xstrcat(payload, (char *)"}}"); if (birth) { topic = topic_base((char *)"NBIRTH"); publisher(mosq, topic, payload, true); } else { topic = topic_base((char *)"NDATA"); publisher(mosq, topic, payload, false); } free(payload); payload = NULL; free(topic); topic = NULL; } int mqtt_connect(void) { char *id = NULL, *topic = NULL; char err[1024]; int rc; /* * Initialize mosquitto communication */ gethostname(my_hostname, 255); mosquitto_lib_init(); id = xstrcpy((char *)"bmsd/"); id = xstrcat(id, my_hostname); if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { /* * Enforce maximum client id length of 23 characters */ id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; } mosq = mosquitto_new(id, TRUE, NULL); if (!mosq) { switch(errno) { case ENOMEM: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); break; case EINVAL: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); break; } mosquitto_lib_cleanup(); return 1; } free(id); id = NULL; /* * Set our will */ topic = topic_base((char *)"NDEATH"); if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) { if (rc > MOSQ_ERR_SUCCESS) syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); mosquitto_lib_cleanup(); return 2; } free(topic); topic = NULL; if (debug) mosquitto_log_callback_set(mosq, my_log_callback); /* * Username/Password */ if (Config.mqtt_user) { if (Config.mqtt_pass) { rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass); } else { rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL); } if (rc == MOSQ_ERR_INVAL) { syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid"); } else if (rc == MOSQ_ERR_NOMEM) { syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory"); } if (rc != MOSQ_ERR_SUCCESS) { mosquitto_lib_cleanup(); return 3; } } mosquitto_max_inflight_messages_set(mosq, max_inflight); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { if (rc == MOSQ_ERR_ERRNO) { strerror_r(errno, err, 1024); syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); } else { syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); } mosquitto_lib_cleanup(); syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); return 4; } else { mqtt_use = TRUE; syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); /* * Initialise is complete, report our presence state */ mosquitto_loop_start(mosq); publishNData(true, 0); } return 0; } void mqtt_disconnect(void) { int rc; char *topic = NULL; if (mqtt_use) { /* * Final publish 0 to clients/<hostname>/bmsd/state * After that, remove the retained topic. */ syslog(LOG_NOTICE, "MQTT disconnecting"); topic = topic_base((char *)"NBIRTH"); publisher(mosq, topic, NULL, true); free(topic); topic = topic_base((char *)"NDEATH"); publisher(mosq, topic, NULL, true); free(topic); topic = NULL; mqtt_last_mid = mqtt_mid_sent; mqtt_status = STATUS_WAITING; mqtt_my_shutdown = TRUE; do { if (mqtt_status == STATUS_WAITING) { if (debug) fprintf(stdout, (char *)"Waiting\n"); if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) { mosquitto_disconnect(mosq); mqtt_disconnect_sent = TRUE; } usleep(100000); } rc = MOSQ_ERR_SUCCESS; } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); mosquitto_loop_stop(mosq, FALSE); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mqtt_use = FALSE; mqtt_status = STATUS_CONNECTING; mqtt_mid_sent = 0; mqtt_last_mid = -1; mqtt_last_mid_sent = -1; mqtt_connected = TRUE; mqtt_disconnect_sent = FALSE; mqtt_connect_lost = FALSE; mqtt_my_shutdown = FALSE; syslog(LOG_NOTICE, "MQTT: disconnected"); } }