bmsd/websocket.c

changeset 671
4b54d6f79d25
child 672
23f959713fcb
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bmsd/websocket.c	Mon May 11 17:32:08 2020 +0200
@@ -0,0 +1,246 @@
+/**
+ * @file websocket.c
+ * @brief WebSockets interface
+ * @author Michiel Broek <mbroek at mbse dot eu>
+ *
+ * Copyright (C) 2020
+ *
+ * Michiel Broek <mbroek at mbse dot eu>
+ *
+ * This file is part of the bms (Brewery Management System)
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2, or (at your option) any
+ * later version.
+ *
+ * bms is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with ThermFerm; see the file COPYING.  If not, write to the Free
+ * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+
+#include "bms.h"
+#include "xutil.h"
+#include "websocket.h"
+#include <libwebsockets.h>
+
+
+extern sys_config       Config;
+extern int		debug;
+extern int		my_shutdown;
+
+struct			lws_context *context;
+int			ws_clients = 0;
+time_t			last_msg = 0;
+pthread_mutex_t		ws_mutex;
+
+
+/*
+ * Based on lws-mirror-protocol from libwebsockets v2.0.x
+ * Debian ships v2.0.3, on Slackware we have 2.4.0 and there
+ * are lots of changes in the api.
+ */
+#define MAX_MESSAGE_QUEUE 512
+
+/*
+ * one of these created for each message
+ */
+struct a_message {
+    void		*payload; /* is malloc'd */
+    size_t		len;
+};
+
+
+static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
+static int ringbuffer_head;
+
+
+
+static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
+{
+    struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user;
+    int n, m;
+
+    switch (reason) {
+
+	case LWS_CALLBACK_ESTABLISHED: {
+		ws_clients++;
+		syslog(LOG_NOTICE, "ws: new connection, total %d", ws_clients);
+		pss->ringbuffer_tail = ringbuffer_head;
+                pss->wsi = wsi;
+		break;
+	}
+
+	case LWS_CALLBACK_PROTOCOL_DESTROY:
+		syslog(LOG_NOTICE, "ws: protocol cleaning up");
+                for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++)
+                    if (ringbuffer[n].payload)
+                	free(ringbuffer[n].payload);
+                break;
+
+        case LWS_CALLBACK_SERVER_WRITEABLE:
+                while (pss->ringbuffer_tail != ringbuffer_head) {
+                    m = ringbuffer[pss->ringbuffer_tail].len;
+                    n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT);
+                    if (n < 0) {
+                        syslog(LOG_NOTICE, "ws: ERROR %d writing", n);
+                        return -1;
+                    }
+                    if (n < m)
+                        syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m);
+
+                    if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
+                        pss->ringbuffer_tail = 0;
+                    else
+                        pss->ringbuffer_tail++;
+
+                    if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
+                        lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));
+
+                    if (lws_send_pipe_choked(wsi)) {
+                        lws_callback_on_writable(wsi);
+                        break;
+                    }
+                }
+                break;
+
+	case LWS_CALLBACK_RECEIVE:
+                if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
+                    syslog(LOG_NOTICE, "ws: dropping!");
+                    goto choke;
+                }
+
+                if (ringbuffer[ringbuffer_head].payload)
+                    free(ringbuffer[ringbuffer_head].payload);
+
+                ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
+                ringbuffer[ringbuffer_head].len = len;
+                memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, in, len);
+                if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
+                    ringbuffer_head = 0;
+                else
+                    ringbuffer_head++;
+
+                if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2))
+                    goto done;
+
+choke:
+                syslog(LOG_NOTICE, "ws: LWS_CALLBACK_RECEIVE: throttling");
+                lws_rx_flow_control(wsi, 0);
+
+done:
+                lws_callback_on_writable_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));
+                break;
+
+	case LWS_CALLBACK_CLOSED:
+		ws_clients--;
+		syslog(LOG_NOTICE, "ws: connection closed, left %d", ws_clients);
+		break;
+
+	default:
+		break;
+  }
+
+  return 0;
+}
+
+
+
+static struct lws_protocols protocols[] = {
+	{ "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), 128 },
+        { NULL, NULL, 0, 0 } /* terminator */
+};
+
+
+/*
+ *  {"device":"fermenter","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"}
+ */
+void ws_broadcast(char *msg)
+{
+    int         len, err;
+
+    err = pthread_mutex_lock(&ws_mutex);
+    if (err) {
+	syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err);
+    } else {
+
+    	len = strlen(msg);
+    	if (ringbuffer[ringbuffer_head].payload)
+            free(ringbuffer[ringbuffer_head].payload);
+
+    	ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
+    	ringbuffer[ringbuffer_head].len = len;
+    	memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len);
+    	if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
+            ringbuffer_head = 0;
+    	else
+            ringbuffer_head++;
+
+//    syslog(LOG_NOTICE, "ws: %d %s", ringbuffer_head, msg);
+    	syslog(LOG_NOTICE, "ws: broadcast buffer=%d  len=%d", ringbuffer_head, len);
+
+    	lws_callback_on_writable_all_protocol(context, &protocols[0]);
+    	err = pthread_mutex_unlock(&ws_mutex);
+    	if (err) {
+            syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err);
+    	}
+    	last_msg = time(NULL);
+    }
+}
+
+
+
+/*
+ * Called every 5 seconds.
+ */
+void ws_check(void)
+{
+    time_t	now = time(NULL);
+
+    if (((int)now - (int)last_msg) > 45) {
+	ws_broadcast((char *)"{\"ping\":1}");
+    }
+}
+
+
+
+void *ws_loop(void *threadid)
+{
+    struct	lws_context_creation_info info;
+    int		n = 0;
+
+    syslog(LOG_NOTICE, "Thread ws_loop started");
+
+    memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
+    info.port = 8010;
+    info.protocols = protocols;
+    info.gid = -1;
+    info.uid = -1;
+    info.keepalive_timeout = 900;
+    info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;
+
+    context = lws_create_context(&info);
+
+    if (context == NULL) {
+	syslog(LOG_NOTICE, "libwebsocket_create_context() failed");
+    }
+
+    /*
+     * Loop forever until external shutdown variable is set.
+     */
+    while (n >= 0 && ! my_shutdown) {
+
+	n = lws_service(context, 50);
+    }
+    lws_context_destroy(context);
+
+    syslog(LOG_NOTICE, "Thread ws_loop stopped");
+    return 0;
+}
+
+

mercurial