Wed, 18 Oct 2023 10:06:11 +0200
Version 0.3.45. Removed all writing to ascii logfiles in the webserver environment, only log to MySQL.
/** * @file websocket.c * @brief WebSockets interface * @author Michiel Broek <mbroek at mbse dot eu> * * Copyright (C) 2020 * * Michiel Broek <mbroek at mbse dot eu> * * This file is part of the bms (Brewery Management System) * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * bms is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ThermFerm; see the file COPYING. If not, write to the Free * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. */ #include "bms.h" #include "xutil.h" #include "websocket.h" #include "fermenters.h" #include "co2meters.h" #include "ispindels.h" #include "nodes.h" #include <libwebsockets.h> extern sys_config Config; extern int debug; extern int my_shutdown; struct lws_context *context; int ws_clients = 0; time_t last_msg = 0; pthread_mutex_t ws_mutex; /* * Based on lws-mirror-protocol from libwebsockets v2.0.x * Debian ships v2.0.3, on Slackware we have 2.4.0 and there * are lots of changes in the api. */ #define MAX_MESSAGE_QUEUE 512 #define WS_INBUF 2048 /* * one of these created for each message */ struct a_message { void *payload; /* is malloc'd */ size_t len; }; static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; static int ringbuffer_head; static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; int n, m; char buf[WS_INBUF + 1]; switch (reason) { case LWS_CALLBACK_ESTABLISHED: { ws_clients++; pss->ringbuffer_tail = ringbuffer_head; pss->wsi = wsi; break; } case LWS_CALLBACK_PROTOCOL_DESTROY: syslog(LOG_NOTICE, "Websocket: protocol cleaning up"); for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++) if (ringbuffer[n].payload) free(ringbuffer[n].payload); break; case LWS_CALLBACK_SERVER_WRITEABLE: while (pss->ringbuffer_tail != ringbuffer_head) { m = ringbuffer[pss->ringbuffer_tail].len; n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT); if (n < 0) { syslog(LOG_NOTICE, "ws: ERROR %d writing", n); return -1; } if (n < m) syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m); if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) pss->ringbuffer_tail = 0; else pss->ringbuffer_tail++; if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); if (lws_send_pipe_choked(wsi)) { lws_callback_on_writable(wsi); break; } } break; case LWS_CALLBACK_RECEIVE: memcpy(buf, in, len); buf[len] = '\0'; // syslog(LOG_NOTICE, "ws: reveived %ld bytes %s", len, buf); if (strncmp(buf, (char *)"{\"device\":\"fermenters\",", 23) == 0) { fermenter_ws_receive(buf); } else if (strncmp(buf, (char *)"{\"device\":\"co2meters\",", 22) == 0) { co2meter_ws_receive(buf); } else if (strncmp(buf, (char *)"{\"device\":\"ispindels\",", 22) == 0) { ispindel_ws_receive(buf); } else if (strncmp(buf, (char *)"{\"node\":\"", 9) == 0) { node_ws_receive(buf); } break; case LWS_CALLBACK_CLOSED: ws_clients--; break; default: break; } return 0; } static struct lws_protocols protocols[] = { { "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), WS_INBUF }, { NULL, NULL, 0, 0 } /* terminator */ }; /* * {"node":"host","group":"group","online":1,"lastseen":"datetime","temperature":20.5,"humidity":47,"ip":"ipaddr","rssi":-1} * {"device":"fermenters","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"} * {"device":"co2meters","node":"seaport","unit":"unit0","online":1,"mode":"ON","temperature":20.875,"pressure_channel":6,"pressure_voltage":0.834,"pressure_zero":0.110,"pressure_bar":2.3,"alarm":0} * {"device":"ispindels","node":"seaport","unit":"unit0","online":1,"mode":"ON","temperature":20.875,"angle":45.223,"battery":4.121,"gravity":14.832,"alarm":0} */ void ws_broadcast(char *msg) { int len, err; err = pthread_mutex_lock(&ws_mutex); if (err) { syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err); } else { len = strlen(msg); if (ringbuffer[ringbuffer_head].payload) free(ringbuffer[ringbuffer_head].payload); ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); ringbuffer[ringbuffer_head].len = len; memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len); if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0; else ringbuffer_head++; // syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len); lws_callback_on_writable_all_protocol(context, &protocols[0]); err = pthread_mutex_unlock(&ws_mutex); if (err) { syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err); } last_msg = time(NULL); } } /* * Called every 5 seconds. */ void ws_check(void) { time_t now = time(NULL); if (((int)now - (int)last_msg) > 45) { ws_broadcast((char *)"{\"ping\":1}"); } } void *ws_loop(void *threadid) { struct lws_context_creation_info info; int n = 0; memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ info.port = Config.websocket_port; info.protocols = protocols; info.gid = -1; info.uid = -1; info.keepalive_timeout = 900; info.options = LWS_SERVER_OPTION_VALIDATE_UTF8; context = lws_create_context(&info); if (context == NULL) { syslog(LOG_NOTICE, "libwebsocket_create_context() failed"); return (void *)1; } syslog(LOG_NOTICE, "Websocket: server started port %d", info.port); /* * Loop forever until external shutdown variable is set. */ while (n >= 0 && ! my_shutdown) { n = lws_service(context, 50); } lws_context_destroy(context); syslog(LOG_NOTICE, "Websocket: server stopped"); return (void *)0; }