Sun, 06 Jan 2019 23:30:17 +0100
Added products beerxml export.
/***************************************************************************** * Copyright (C) 2017-2018 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the bms (Brewery Management System) * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * bms is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. *****************************************************************************/ #include "bms.h" #include "xutil.h" #include "mqtt.h" #include "nodes.h" #include "fermenters.h" extern sys_config Config; extern int debug; /* Global variables for use in callbacks. */ int mqtt_qos = 0; int mqtt_status = STATUS_CONNECTING; int mqtt_mid_sent = 0; int mqtt_last_mid = -1; int mqtt_last_mid_sent = -1; int mqtt_connected = TRUE; int mqtt_disconnect_sent = FALSE; int mqtt_connect_lost = FALSE; int mqtt_my_shutdown = FALSE; int mqtt_use = FALSE; int keepalive = 60; unsigned int max_inflight = 20; struct mosquitto *mosq = NULL; char *state = NULL; char my_hostname[256]; int Sequence = 0; char *payload_header(void) { static char *tmp; char buf[128]; tmp = xstrcpy((char *)"{\"timestamp\":"); sprintf(buf, "%ld", time(NULL)); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"seq\":"); sprintf(buf, "%d", Sequence++); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"metric\":"); return tmp; } char *topic_base(char *msgtype) { static char *tmp; tmp = xstrcpy((char *)"mbv1.0/brewery/"); tmp = xstrcat(tmp, msgtype); tmp = xstrcat(tmp, (char *)"/"); tmp = xstrcat(tmp, my_hostname); return tmp; } void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) { char *topic = NULL; if (mqtt_connect_lost) { mqtt_connect_lost = FALSE; syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); } if (!result) { topic = topic_base((char *)"NCMD"); // TODO: do we need this?? topic = xstrcat(topic, (char *)"/#"); mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = xstrcpy((char *)"mbv1.0/fermenters/#"); // Subscribe to fermenter messages. mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = xstrcpy((char *)"mbv1.0/brewcontrol/#"); // Subscribe to brewcontrol messages. mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = NULL; mqtt_status = STATUS_CONNACK_RECVD; } else { syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); } } void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) { if (mqtt_my_shutdown) { syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); mqtt_connected = FALSE; } else { /* * The remote server was brought down. We must keep running */ syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); mqtt_connect_lost = TRUE; } } void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) { mqtt_last_mid_sent = mid; } void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); for (i = 1; i < qos_count; i++) { syslog(LOG_NOTICE, " %d", granted_qos[i]); } } void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) { syslog(LOG_NOTICE, "MQTT: %s", str); if (debug) fprintf(stdout, "MQTT: %s\n", str); } void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) { if (message->payloadlen) { // TODO: process subscribed topics here. if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) { node_birth_data(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"fermenters") && (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) { fermenter_birth_data(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) { fermenter_log(message->topic, (char *)message->payload); return; } syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); } else { if (strstr(message->topic, (char *)"NBIRTH")) { // Ignore ?? fprintf(stdout, "MQTT: %s NULL\n", message->topic); return; } if (strstr(message->topic, (char *)"NDEATH")) { node_death(message->topic); return; } if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) { fermenter_death(message->topic); return; } syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); } } void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) { // publish the data if (payload) mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); else mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); } void publishNData(bool birth, int flag) { char *topic = NULL, *payload = NULL; struct utsname ubuf; bool comma = false; payload = payload_header(); payload = xstrcat(payload, (char *)"{"); if (birth || flag & MQTT_NODE_CONTROL) { payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); comma = true; } if (birth) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); if (uname(&ubuf) == 0) { payload = xstrcat(payload, (char *)",\"os\":\""); payload = xstrcat(payload, ubuf.sysname); payload = xstrcat(payload, (char *)"\",\"os_version\":\""); payload = xstrcat(payload, ubuf.release); payload = xstrcat(payload, (char *)"\""); } else { payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); } payload = xstrcat(payload, (char *)",\"FW\":\""); payload = xstrcat(payload, (char *)VERSION); payload = xstrcat(payload, (char *)"\"}"); comma = true; } payload = xstrcat(payload, (char *)"}}"); if (birth) { topic = topic_base((char *)"NBIRTH"); publisher(mosq, topic, payload, true); } else { topic = topic_base((char *)"NDATA"); publisher(mosq, topic, payload, false); } free(payload); payload = NULL; free(topic); topic = NULL; } int mqtt_connect(void) { char *id = NULL, *topic = NULL; char err[1024]; int rc; /* * Initialize mosquitto communication */ gethostname(my_hostname, 255); mosquitto_lib_init(); id = xstrcpy((char *)"bmsd/"); id = xstrcat(id, my_hostname); if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { /* * Enforce maximum client id length of 23 characters */ id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; } mosq = mosquitto_new(id, TRUE, NULL); if (!mosq) { switch(errno) { case ENOMEM: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); break; case EINVAL: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); break; } mosquitto_lib_cleanup(); return 1; } free(id); id = NULL; /* * Set our will */ topic = topic_base((char *)"NDEATH"); if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) { if (rc > MOSQ_ERR_SUCCESS) syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); mosquitto_lib_cleanup(); return 2; } free(topic); topic = NULL; if (debug) mosquitto_log_callback_set(mosq, my_log_callback); /* * Username/Password */ if (Config.mqtt_user) { if (Config.mqtt_pass) { rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass); } else { rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL); } if (rc == MOSQ_ERR_INVAL) { syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid"); } else if (rc == MOSQ_ERR_NOMEM) { syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory"); } if (rc != MOSQ_ERR_SUCCESS) { mosquitto_lib_cleanup(); return 3; } } mosquitto_max_inflight_messages_set(mosq, max_inflight); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { if (rc == MOSQ_ERR_ERRNO) { strerror_r(errno, err, 1024); syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); } else { syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); } mosquitto_lib_cleanup(); syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); return 4; } else { mqtt_use = TRUE; syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); /* * Initialise is complete, report our presence state */ mosquitto_loop_start(mosq); publishNData(true, 0); } return 0; } void mqtt_disconnect(void) { int rc; char *topic = NULL; if (mqtt_use) { /* * Final publish 0 to clients/<hostname>/bmsd/state * After that, remove the retained topic. */ syslog(LOG_NOTICE, "MQTT disconnecting"); topic = topic_base((char *)"NBIRTH"); publisher(mosq, topic, NULL, true); free(topic); topic = topic_base((char *)"NDEATH"); publisher(mosq, topic, NULL, true); free(topic); topic = NULL; mqtt_last_mid = mqtt_mid_sent; mqtt_status = STATUS_WAITING; mqtt_my_shutdown = TRUE; do { if (mqtt_status == STATUS_WAITING) { if (debug) fprintf(stdout, (char *)"Waiting\n"); if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) { mosquitto_disconnect(mosq); mqtt_disconnect_sent = TRUE; } usleep(100000); } rc = MOSQ_ERR_SUCCESS; } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); mosquitto_loop_stop(mosq, FALSE); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mqtt_use = FALSE; mqtt_status = STATUS_CONNECTING; mqtt_mid_sent = 0; mqtt_last_mid = -1; mqtt_last_mid_sent = -1; mqtt_connected = TRUE; mqtt_disconnect_sent = FALSE; mqtt_connect_lost = FALSE; mqtt_my_shutdown = FALSE; syslog(LOG_NOTICE, "MQTT: disconnected"); } }