diff -r 638e7dd1d560 -r 4b54d6f79d25 bmsd/websocket.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bmsd/websocket.c Mon May 11 17:32:08 2020 +0200 @@ -0,0 +1,246 @@ +/** + * @file websocket.c + * @brief WebSockets interface + * @author Michiel Broek + * + * Copyright (C) 2020 + * + * Michiel Broek + * + * This file is part of the bms (Brewery Management System) + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * bms is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with ThermFerm; see the file COPYING. If not, write to the Free + * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ + +#include "bms.h" +#include "xutil.h" +#include "websocket.h" +#include + + +extern sys_config Config; +extern int debug; +extern int my_shutdown; + +struct lws_context *context; +int ws_clients = 0; +time_t last_msg = 0; +pthread_mutex_t ws_mutex; + + +/* + * Based on lws-mirror-protocol from libwebsockets v2.0.x + * Debian ships v2.0.3, on Slackware we have 2.4.0 and there + * are lots of changes in the api. + */ +#define MAX_MESSAGE_QUEUE 512 + +/* + * one of these created for each message + */ +struct a_message { + void *payload; /* is malloc'd */ + size_t len; +}; + + +static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; +static int ringbuffer_head; + + + +static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; + int n, m; + + switch (reason) { + + case LWS_CALLBACK_ESTABLISHED: { + ws_clients++; + syslog(LOG_NOTICE, "ws: new connection, total %d", ws_clients); + pss->ringbuffer_tail = ringbuffer_head; + pss->wsi = wsi; + break; + } + + case LWS_CALLBACK_PROTOCOL_DESTROY: + syslog(LOG_NOTICE, "ws: protocol cleaning up"); + for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++) + if (ringbuffer[n].payload) + free(ringbuffer[n].payload); + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + while (pss->ringbuffer_tail != ringbuffer_head) { + m = ringbuffer[pss->ringbuffer_tail].len; + n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT); + if (n < 0) { + syslog(LOG_NOTICE, "ws: ERROR %d writing", n); + return -1; + } + if (n < m) + syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m); + + if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) + pss->ringbuffer_tail = 0; + else + pss->ringbuffer_tail++; + + if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) + lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); + + if (lws_send_pipe_choked(wsi)) { + lws_callback_on_writable(wsi); + break; + } + } + break; + + case LWS_CALLBACK_RECEIVE: + if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) { + syslog(LOG_NOTICE, "ws: dropping!"); + goto choke; + } + + if (ringbuffer[ringbuffer_head].payload) + free(ringbuffer[ringbuffer_head].payload); + + ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); + ringbuffer[ringbuffer_head].len = len; + memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, in, len); + if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) + ringbuffer_head = 0; + else + ringbuffer_head++; + + if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2)) + goto done; + +choke: + syslog(LOG_NOTICE, "ws: LWS_CALLBACK_RECEIVE: throttling"); + lws_rx_flow_control(wsi, 0); + +done: + lws_callback_on_writable_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); + break; + + case LWS_CALLBACK_CLOSED: + ws_clients--; + syslog(LOG_NOTICE, "ws: connection closed, left %d", ws_clients); + break; + + default: + break; + } + + return 0; +} + + + +static struct lws_protocols protocols[] = { + { "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), 128 }, + { NULL, NULL, 0, 0 } /* terminator */ +}; + + +/* + * {"device":"fermenter","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"} + */ +void ws_broadcast(char *msg) +{ + int len, err; + + err = pthread_mutex_lock(&ws_mutex); + if (err) { + syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err); + } else { + + len = strlen(msg); + if (ringbuffer[ringbuffer_head].payload) + free(ringbuffer[ringbuffer_head].payload); + + ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); + ringbuffer[ringbuffer_head].len = len; + memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len); + if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) + ringbuffer_head = 0; + else + ringbuffer_head++; + +// syslog(LOG_NOTICE, "ws: %d %s", ringbuffer_head, msg); + syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len); + + lws_callback_on_writable_all_protocol(context, &protocols[0]); + err = pthread_mutex_unlock(&ws_mutex); + if (err) { + syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err); + } + last_msg = time(NULL); + } +} + + + +/* + * Called every 5 seconds. + */ +void ws_check(void) +{ + time_t now = time(NULL); + + if (((int)now - (int)last_msg) > 45) { + ws_broadcast((char *)"{\"ping\":1}"); + } +} + + + +void *ws_loop(void *threadid) +{ + struct lws_context_creation_info info; + int n = 0; + + syslog(LOG_NOTICE, "Thread ws_loop started"); + + memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ + info.port = 8010; + info.protocols = protocols; + info.gid = -1; + info.uid = -1; + info.keepalive_timeout = 900; + info.options = LWS_SERVER_OPTION_VALIDATE_UTF8; + + context = lws_create_context(&info); + + if (context == NULL) { + syslog(LOG_NOTICE, "libwebsocket_create_context() failed"); + } + + /* + * Loop forever until external shutdown variable is set. + */ + while (n >= 0 && ! my_shutdown) { + + n = lws_service(context, 50); + } + lws_context_destroy(context); + + syslog(LOG_NOTICE, "Thread ws_loop stopped"); + return 0; +} + +