Tue, 31 Aug 2021 20:48:37 +0200
Split batch, adjust mash step volume. In the duplicated log_brew handle the missing values. In save product, round the mash step sg to 4 decimals. In prod_edit, ingredients are stored as strings, not arrays. This triggered a memory corruption that only happened in rare circumstances. Don't fix mash step fields in the javascript, it is already done during load from the database. Calculation of the mash volume is rounded to 6 decimals. Enter mash step Brix/Plato value, the SG result is rounded to 4 decimals.
/** * @file websocket.c * @brief WebSockets interface * @author Michiel Broek <mbroek at mbse dot eu> * * Copyright (C) 2020 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the bms (Brewery Management System) * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * bms is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include "bms.h" #include "xutil.h" #include "websocket.h" #include "fermenters.h" #include "co2meters.h" #include "ispindels.h" #include <libwebsockets.h> extern sys_config Config; extern int debug; extern int my_shutdown; struct lws_context *context; int ws_clients = 0; time_t last_msg = 0; pthread_mutex_t ws_mutex; /* * Based on lws-mirror-protocol from libwebsockets v2.0.x * Debian ships v2.0.3, on Slackware we have 2.4.0 and there * are lots of changes in the api. */ #define MAX_MESSAGE_QUEUE 512 #define WS_INBUF 2048 /* * one of these created for each message */ struct a_message { void *payload; /* is malloc'd */ size_t len; }; static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; static int ringbuffer_head; static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; int n, m; char buf[WS_INBUF + 1]; switch (reason) { case LWS_CALLBACK_ESTABLISHED: { ws_clients++; pss->ringbuffer_tail = ringbuffer_head; pss->wsi = wsi; break; } case LWS_CALLBACK_PROTOCOL_DESTROY: syslog(LOG_NOTICE, "Websocket: protocol cleaning up"); for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++) if (ringbuffer[n].payload) free(ringbuffer[n].payload); break; case LWS_CALLBACK_SERVER_WRITEABLE: while (pss->ringbuffer_tail != ringbuffer_head) { m = ringbuffer[pss->ringbuffer_tail].len; n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT); if (n < 0) { syslog(LOG_NOTICE, "ws: ERROR %d writing", n); return -1; } if (n < m) syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m); if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) pss->ringbuffer_tail = 0; else pss->ringbuffer_tail++; if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); if (lws_send_pipe_choked(wsi)) { lws_callback_on_writable(wsi); break; } } break; case LWS_CALLBACK_RECEIVE: memcpy(buf, in, len); buf[len] = '\0'; // syslog(LOG_NOTICE, "ws: reveived %ld bytes %s", len, buf); if (strncmp(buf, (char *)"{\"device\":\"fermenters\",", 23) == 0) { fermenter_ws_receive(buf); } else if (strncmp(buf, (char *)"{\"device\":\"co2meters\",", 22) == 0) { co2meter_ws_receive(buf); } else if (strncmp(buf, (char *)"{\"device\":\"ispindels\",", 22) == 0) { ispindel_ws_receive(buf); } break; case LWS_CALLBACK_CLOSED: ws_clients--; break; default: break; } return 0; } static struct lws_protocols protocols[] = { { "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), WS_INBUF }, { NULL, NULL, 0, 0 } /* terminator */ }; /* * {"node":"host","group":"group","online":1,"lastseen":"datetime","temperature":20.5,"humidity":47,"ip":"ipaddr","rssi":-1} * {"device":"fermenters","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"} * {"device":"co2meters","node":"seaport","unit":"unit0","online":1,"mode":"ON","temperature":20.875,"pressure_channel":6,"pressure_voltage":0.834,"pressure_zero":0.110,"pressure_bar":2.3,"alarm":0} * {"device":"ispindels","node":"seaport","unit":"unit0","online":1,"mode":"ON","temperature":20.875,"angle":45.223,"battery":4.121,"gravity":14.832,"alarm":0} */ void ws_broadcast(char *msg) { int len, err; err = pthread_mutex_lock(&ws_mutex); if (err) { syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err); } else { len = strlen(msg); if (ringbuffer[ringbuffer_head].payload) free(ringbuffer[ringbuffer_head].payload); ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); ringbuffer[ringbuffer_head].len = len; memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len); if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0; else ringbuffer_head++; // syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len); lws_callback_on_writable_all_protocol(context, &protocols[0]); err = pthread_mutex_unlock(&ws_mutex); if (err) { syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err); } last_msg = time(NULL); } } /* * Called every 5 seconds. */ void ws_check(void) { time_t now = time(NULL); if (((int)now - (int)last_msg) > 45) { ws_broadcast((char *)"{\"ping\":1}"); } } void *ws_loop(void *threadid) { struct lws_context_creation_info info; int n = 0; memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ info.port = Config.websocket_port; info.protocols = protocols; info.gid = -1; info.uid = -1; info.keepalive_timeout = 900; info.options = LWS_SERVER_OPTION_VALIDATE_UTF8; context = lws_create_context(&info); if (context == NULL) { syslog(LOG_NOTICE, "libwebsocket_create_context() failed"); return (void *)1; } syslog(LOG_NOTICE, "Websocket: server started port %d", info.port); /* * Loop forever until external shutdown variable is set. */ while (n >= 0 && ! my_shutdown) { n = lws_service(context, 50); } lws_context_destroy(context); syslog(LOG_NOTICE, "Websocket: server stopped"); return (void *)0; }