bmsd/websocket.c

changeset 671
4b54d6f79d25
child 672
23f959713fcb
equal deleted inserted replaced
670:638e7dd1d560 671:4b54d6f79d25
1 /**
2 * @file websocket.c
3 * @brief WebSockets interface
4 * @author Michiel Broek <mbroek at mbse dot eu>
5 *
6 * Copyright (C) 2020
7 *
8 * Michiel Broek <mbroek at mbse dot eu>
9 *
10 * This file is part of the bms (Brewery Management System)
11 *
12 * This is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2, or (at your option) any
15 * later version.
16 *
17 * bms is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 * General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with ThermFerm; see the file COPYING. If not, write to the Free
24 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
25 */
26
27 #include "bms.h"
28 #include "xutil.h"
29 #include "websocket.h"
30 #include <libwebsockets.h>
31
32
33 extern sys_config Config;
34 extern int debug;
35 extern int my_shutdown;
36
37 struct lws_context *context;
38 int ws_clients = 0;
39 time_t last_msg = 0;
40 pthread_mutex_t ws_mutex;
41
42
43 /*
44 * Based on lws-mirror-protocol from libwebsockets v2.0.x
45 * Debian ships v2.0.3, on Slackware we have 2.4.0 and there
46 * are lots of changes in the api.
47 */
48 #define MAX_MESSAGE_QUEUE 512
49
50 /*
51 * one of these created for each message
52 */
53 struct a_message {
54 void *payload; /* is malloc'd */
55 size_t len;
56 };
57
58
59 static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
60 static int ringbuffer_head;
61
62
63
64 static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
65 {
66 struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user;
67 int n, m;
68
69 switch (reason) {
70
71 case LWS_CALLBACK_ESTABLISHED: {
72 ws_clients++;
73 syslog(LOG_NOTICE, "ws: new connection, total %d", ws_clients);
74 pss->ringbuffer_tail = ringbuffer_head;
75 pss->wsi = wsi;
76 break;
77 }
78
79 case LWS_CALLBACK_PROTOCOL_DESTROY:
80 syslog(LOG_NOTICE, "ws: protocol cleaning up");
81 for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++)
82 if (ringbuffer[n].payload)
83 free(ringbuffer[n].payload);
84 break;
85
86 case LWS_CALLBACK_SERVER_WRITEABLE:
87 while (pss->ringbuffer_tail != ringbuffer_head) {
88 m = ringbuffer[pss->ringbuffer_tail].len;
89 n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT);
90 if (n < 0) {
91 syslog(LOG_NOTICE, "ws: ERROR %d writing", n);
92 return -1;
93 }
94 if (n < m)
95 syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m);
96
97 if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
98 pss->ringbuffer_tail = 0;
99 else
100 pss->ringbuffer_tail++;
101
102 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
103 lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));
104
105 if (lws_send_pipe_choked(wsi)) {
106 lws_callback_on_writable(wsi);
107 break;
108 }
109 }
110 break;
111
112 case LWS_CALLBACK_RECEIVE:
113 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
114 syslog(LOG_NOTICE, "ws: dropping!");
115 goto choke;
116 }
117
118 if (ringbuffer[ringbuffer_head].payload)
119 free(ringbuffer[ringbuffer_head].payload);
120
121 ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
122 ringbuffer[ringbuffer_head].len = len;
123 memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, in, len);
124 if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
125 ringbuffer_head = 0;
126 else
127 ringbuffer_head++;
128
129 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2))
130 goto done;
131
132 choke:
133 syslog(LOG_NOTICE, "ws: LWS_CALLBACK_RECEIVE: throttling");
134 lws_rx_flow_control(wsi, 0);
135
136 done:
137 lws_callback_on_writable_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));
138 break;
139
140 case LWS_CALLBACK_CLOSED:
141 ws_clients--;
142 syslog(LOG_NOTICE, "ws: connection closed, left %d", ws_clients);
143 break;
144
145 default:
146 break;
147 }
148
149 return 0;
150 }
151
152
153
154 static struct lws_protocols protocols[] = {
155 { "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), 128 },
156 { NULL, NULL, 0, 0 } /* terminator */
157 };
158
159
160 /*
161 * {"device":"fermenter","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"}
162 */
163 void ws_broadcast(char *msg)
164 {
165 int len, err;
166
167 err = pthread_mutex_lock(&ws_mutex);
168 if (err) {
169 syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err);
170 } else {
171
172 len = strlen(msg);
173 if (ringbuffer[ringbuffer_head].payload)
174 free(ringbuffer[ringbuffer_head].payload);
175
176 ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
177 ringbuffer[ringbuffer_head].len = len;
178 memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len);
179 if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
180 ringbuffer_head = 0;
181 else
182 ringbuffer_head++;
183
184 // syslog(LOG_NOTICE, "ws: %d %s", ringbuffer_head, msg);
185 syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len);
186
187 lws_callback_on_writable_all_protocol(context, &protocols[0]);
188 err = pthread_mutex_unlock(&ws_mutex);
189 if (err) {
190 syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err);
191 }
192 last_msg = time(NULL);
193 }
194 }
195
196
197
198 /*
199 * Called every 5 seconds.
200 */
201 void ws_check(void)
202 {
203 time_t now = time(NULL);
204
205 if (((int)now - (int)last_msg) > 45) {
206 ws_broadcast((char *)"{\"ping\":1}");
207 }
208 }
209
210
211
212 void *ws_loop(void *threadid)
213 {
214 struct lws_context_creation_info info;
215 int n = 0;
216
217 syslog(LOG_NOTICE, "Thread ws_loop started");
218
219 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
220 info.port = 8010;
221 info.protocols = protocols;
222 info.gid = -1;
223 info.uid = -1;
224 info.keepalive_timeout = 900;
225 info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;
226
227 context = lws_create_context(&info);
228
229 if (context == NULL) {
230 syslog(LOG_NOTICE, "libwebsocket_create_context() failed");
231 }
232
233 /*
234 * Loop forever until external shutdown variable is set.
235 */
236 while (n >= 0 && ! my_shutdown) {
237
238 n = lws_service(context, 50);
239 }
240 lws_context_destroy(context);
241
242 syslog(LOG_NOTICE, "Thread ws_loop stopped");
243 return 0;
244 }
245
246

mercurial