diff -r 6cabc02f4c8d -r 825210ba2707 thermferm/websocket.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/thermferm/websocket.c Sat Apr 13 16:50:26 2024 +0200 @@ -0,0 +1,246 @@ +/** + * @file websocket.c + * @brief WebSockets interface + * @author Michiel Broek + * + * Copyright (C) 2024 + * + * Michiel Broek + * + * This file is part of the mbsePi-apps + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * bms is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with ThermFerm; see the file COPYING. If not, write to the Free + * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ + +#include "thermferm.h" +#include "xutil.h" +#include "websocket.h" +#include + + +extern sys_config Config; +extern int debug; + +int my_ws_shutdown = 0; +int my_ws_state = 0; +struct lws_context *context; +int ws_clients = 0; +time_t last_msg = 0; +pthread_mutex_t ws_mutex; + + +/* + * Based on lws-mirror-protocol + */ +#define MAX_MESSAGE_QUEUE 512 + +#define WS_INBUF 2048 + + +/* + * one of these created for each message + */ +struct a_message { + void *payload; /* is malloc'd */ + size_t len; +}; + + +static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; +static int ringbuffer_head; + + + +static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +{ + struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; + int n, m; + char buf[WS_INBUF + 1]; + + switch (reason) { + + case LWS_CALLBACK_ESTABLISHED: { + ws_clients++; + pss->ringbuffer_tail = ringbuffer_head; + pss->wsi = wsi; + break; + } + + case LWS_CALLBACK_PROTOCOL_DESTROY: + syslog(LOG_NOTICE, "Websocket: protocol cleaning up"); + for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++) + if (ringbuffer[n].payload) + free(ringbuffer[n].payload); + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + while (pss->ringbuffer_tail != ringbuffer_head) { + m = ringbuffer[pss->ringbuffer_tail].len; + n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT); + if (n < 0) { + syslog(LOG_NOTICE, "ws: ERROR %d writing", n); + return -1; + } + if (n < m) + syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m); + + if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) + pss->ringbuffer_tail = 0; + else + pss->ringbuffer_tail++; + + if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) + lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); + + if (lws_send_pipe_choked(wsi)) { + lws_callback_on_writable(wsi); + break; + } + } + break; + + case LWS_CALLBACK_RECEIVE: + + memcpy(buf, in, len); + buf[len] = '\0'; + // syslog(LOG_NOTICE, "ws: reveived %ld bytes %s", len, buf); + /* + * These are send by bmsapp to bmsd. Then bmsd resends these via MQTT. + * Do we want to change that? Or use it for the new web pages. + * {"node":"rpi01","group_id":"fermenters","control":"reboot"} + * {"node":"rpi01","group_id":"fermenters","control":"rebirth"} + */ +// if (strncmp(buf, (char *)"{\"device\":\"fermenters\",", 23) == 0) { +// fermenter_ws_receive(buf); +// } else if (strncmp(buf, (char *)"{\"device\":\"co2meters\",", 22) == 0) { +// co2meter_ws_receive(buf); +// } else if (strncmp(buf, (char *)"{\"device\":\"ispindels\",", 22) == 0) { +// ispindel_ws_receive(buf); +// } else if (strncmp(buf, (char *)"{\"node\":\"", 9) == 0) { +// node_ws_receive(buf); +// } + + break; + + case LWS_CALLBACK_CLOSED: + ws_clients--; + break; + + default: + break; + } + + return 0; +} + + + +static struct lws_protocols protocols[] = { + { "thermferm-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), WS_INBUF }, + { NULL, NULL, 0, 0 } /* terminator */ +}; + + +/* + * {"node":"host","group":"group","online":1,"lastseen":"datetime","temperature":20.5,"humidity":47,"ip":"ipaddr","rssi":-1} + * {"device":"fermenters","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"} + */ +void ws_broadcast(char *msg) +{ + int len, err; + + syslog(LOG_NOTICE, "%s", msg); + err = pthread_mutex_lock(&ws_mutex); + if (err) { + syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err); + } else { + + len = strlen(msg); + if (ringbuffer[ringbuffer_head].payload) + free(ringbuffer[ringbuffer_head].payload); + + ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); + ringbuffer[ringbuffer_head].len = len; + memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len); + if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) + ringbuffer_head = 0; + else + ringbuffer_head++; + +// syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len); + + lws_callback_on_writable_all_protocol(context, &protocols[0]); + err = pthread_mutex_unlock(&ws_mutex); + if (err) { + syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err); + } + last_msg = time(NULL); + } +} + + + +/* + * Called every 5 seconds. + */ +void ws_check(void) +{ + time_t now = time(NULL); + + if (((int)now - (int)last_msg) > 45) { + ws_broadcast((char *)"{\"ping\":1}"); + } +} + + + +void *my_ws_loop(void *threadid) +{ + struct lws_context_creation_info info; + int n = 0; + + my_ws_state = 1; + memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ + info.port = Config.websocket_port; + info.protocols = protocols; + info.gid = -1; + info.uid = -1; + info.keepalive_timeout = 900; + info.options = LWS_SERVER_OPTION_VALIDATE_UTF8; + + context = lws_create_context(&info); + + if (context == NULL) { + syslog(LOG_NOTICE, "libwebsocket_create_context() failed"); + my_ws_state = 0; + return (void *)1; + } + syslog(LOG_NOTICE, "Websocket: server started port %d", info.port); + + /* + * Loop forever until external shutdown variable is set. + */ + while (n >= 0 && ! my_ws_shutdown) { + + n = lws_service(context, 50); + } + lws_context_destroy(context); + + my_ws_state = 0; + syslog(LOG_NOTICE, "Websocket: server stopped"); + return (void *)0; +} + +