Sat, 19 Aug 2023 15:11:35 +0200
De iSpindel plato berekening is in het webscript gezet omdat er dan met meer cijfers achter de komman gerekend wordt. De uitkomst verschilt 0.25 plato! De calibratie is nu dus extern.
/***************************************************************************** * Copyright (C) 2017-2021 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the bms (Brewery Management System) * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * bms is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. *****************************************************************************/ #include "bms.h" #include "xutil.h" #include "mqtt.h" #include "nodes.h" #include "fermenters.h" #include "co2meters.h" #include "ispindels.h" extern sys_config Config; extern int debug; /* Global variables for use in callbacks. */ int mqtt_qos = 0; int mqtt_status = STATUS_CONNECTING; int mqtt_mid_sent = 0; int mqtt_last_mid = -1; int mqtt_last_mid_sent = -1; int mqtt_connected = true; int mqtt_disconnect_sent = false; int mqtt_connect_lost = false; int mqtt_my_shutdown = false; int mqtt_use = false; int keepalive = 60; unsigned int max_inflight = 20; struct mosquitto *mosq = NULL; char *state = NULL; char my_hostname[256]; int Sequence = 0; char *payload_header(void) { static char *tmp; char buf[128]; tmp = xstrcpy((char *)"{\"timestamp\":"); sprintf(buf, "%ld", time(NULL)); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"seq\":"); sprintf(buf, "%d", Sequence++); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"metric\":"); return tmp; } char *topic_base(char *msgtype) { static char *tmp; tmp = xstrcpy((char *)"mbv1.0/brewery/"); tmp = xstrcat(tmp, msgtype); tmp = xstrcat(tmp, (char *)"/"); tmp = xstrcat(tmp, my_hostname); return tmp; } void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) { char *topic = NULL; if (mqtt_connect_lost) { mqtt_connect_lost = false; syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); } if (!result) { topic = topic_base((char *)"NCMD"); // TODO: do we need this?? topic = xstrcat(topic, (char *)"/#"); mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = xstrcpy((char *)"mbv1.0/fermenters/#"); // Subscribe to fermenter messages. mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = xstrcpy((char *)"mbv1.0/co2meters/#"); // Subscribe to co2meter messages. mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = xstrcpy((char *)"mbv1.0/ispindels/#"); // Subscribe to ispindel messages. mosquitto_subscribe(mosq, NULL, topic, 0); free(topic); topic = NULL; mqtt_status = STATUS_CONNACK_RECVD; } else { syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); } } void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) { if (mqtt_my_shutdown) { syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); mqtt_connected = false; } else { /* * The remote server was brought down. We must keep running */ syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); mqtt_connect_lost = true; } } void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) { mqtt_last_mid_sent = mid; } void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); for (i = 1; i < qos_count; i++) { syslog(LOG_NOTICE, " %d", granted_qos[i]); } } void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) { syslog(LOG_NOTICE, "MQTT: %s", str); if (debug) fprintf(stdout, "MQTT: %s\n", str); } void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) { if (message->payloadlen) { // TODO: process subscribed topics here. if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) { node_birth_data(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"fermenters") && (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) { fermenter_birth_data(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) { fermenter_log(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DCMD")) { return; // just ignore our own commands. } if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DBIRTH")) { co2meter_birth_data(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DLOG")) { co2meter_log(message->topic, (char *)message->payload); return; } if (strstr(message->topic, (char *)"ispindels") && strstr(message->topic, (char *)"DBIRTH")) { ispindel_birth_data(message->topic, (char *)message->payload); return; } syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); } else { if (strstr(message->topic, (char *)"NBIRTH")) { // Ignore ?? fprintf(stdout, "MQTT: %s NULL\n", message->topic); return; } if (strstr(message->topic, (char *)"NDEATH")) { node_death(message->topic); return; } if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) { fermenter_death(message->topic); return; } syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); } } void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) { // publish the data if (payload) mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); else mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); } void mqtt_publish(char *topic, char *payload) { mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, false); } void publishNData(bool birth, int flag) { char *topic = NULL, *payload = NULL; struct utsname ubuf; bool comma = false; payload = payload_header(); payload = xstrcat(payload, (char *)"{"); if (birth || flag & MQTT_NODE_CONTROL) { payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); comma = true; } if (birth) { if (comma) payload = xstrcat(payload, (char *)","); payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); if (uname(&ubuf) == 0) { payload = xstrcat(payload, (char *)",\"os\":\""); payload = xstrcat(payload, ubuf.sysname); payload = xstrcat(payload, (char *)"\",\"os_version\":\""); payload = xstrcat(payload, ubuf.release); payload = xstrcat(payload, (char *)"\""); } else { payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); } payload = xstrcat(payload, (char *)",\"FW\":\""); payload = xstrcat(payload, (char *)VERSION); payload = xstrcat(payload, (char *)"\"}"); comma = true; } payload = xstrcat(payload, (char *)"}}"); if (birth) { topic = topic_base((char *)"NBIRTH"); publisher(mosq, topic, payload, true); } else { topic = topic_base((char *)"NDATA"); publisher(mosq, topic, payload, false); } free(payload); payload = NULL; free(topic); topic = NULL; } int mqtt_connect(void) { char *id = NULL, *topic = NULL; char err[1024]; int rc; /* * Initialize mosquitto communication */ gethostname(my_hostname, 255); mosquitto_lib_init(); id = xstrcpy((char *)"bmsd/"); id = xstrcat(id, my_hostname); if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { /* * Enforce maximum client id length of 23 characters */ id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; } mosq = mosquitto_new(id, true, NULL); if (!mosq) { switch(errno) { case ENOMEM: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); break; case EINVAL: syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); break; } mosquitto_lib_cleanup(); return 1; } free(id); id = NULL; /* * Set our will */ topic = topic_base((char *)"NDEATH"); if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) { if (rc > MOSQ_ERR_SUCCESS) syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); mosquitto_lib_cleanup(); return 2; } free(topic); topic = NULL; if (debug) mosquitto_log_callback_set(mosq, my_log_callback); /* * Username/Password */ if (Config.mqtt_user) { if (Config.mqtt_pass) { rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass); } else { rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL); } if (rc == MOSQ_ERR_INVAL) { syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid"); } else if (rc == MOSQ_ERR_NOMEM) { syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory"); } if (rc != MOSQ_ERR_SUCCESS) { mosquitto_lib_cleanup(); return 3; } } mosquitto_max_inflight_messages_set(mosq, max_inflight); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { if (rc == MOSQ_ERR_ERRNO) { strerror_r(errno, err, 1024); syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); } else { syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); } mosquitto_lib_cleanup(); syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); return 4; } else { mqtt_use = true; syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); /* * Initialise is complete, report our presence state */ mosquitto_loop_start(mosq); publishNData(true, 0); } return 0; } void mqtt_disconnect(void) { int rc; char *topic = NULL; if (mqtt_use) { /* * Final publish 0 to clients/<hostname>/bmsd/state * After that, remove the retained topic. */ syslog(LOG_NOTICE, "MQTT disconnecting"); topic = topic_base((char *)"NBIRTH"); publisher(mosq, topic, NULL, true); free(topic); topic = topic_base((char *)"NDEATH"); publisher(mosq, topic, NULL, true); free(topic); topic = NULL; mqtt_last_mid = mqtt_mid_sent; mqtt_status = STATUS_WAITING; mqtt_my_shutdown = true; do { if (mqtt_status == STATUS_WAITING) { if (debug) fprintf(stdout, (char *)"Waiting\n"); if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == false) { mosquitto_disconnect(mosq); mqtt_disconnect_sent = true; } usleep(100000); } rc = MOSQ_ERR_SUCCESS; } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); mosquitto_loop_stop(mosq, false); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mqtt_use = false; mqtt_status = STATUS_CONNECTING; mqtt_mid_sent = 0; mqtt_last_mid = -1; mqtt_last_mid_sent = -1; mqtt_connected = true; mqtt_disconnect_sent = false; mqtt_connect_lost = false; mqtt_my_shutdown = false; syslog(LOG_NOTICE, "MQTT: disconnected"); } }