|
1 /** |
|
2 * @file websocket.c |
|
3 * @brief WebSockets interface |
|
4 * @author Michiel Broek <mbroek at mbse dot eu> |
|
5 * |
|
6 * Copyright (C) 2020 |
|
7 * |
|
8 * Michiel Broek <mbroek at mbse dot eu> |
|
9 * |
|
10 * This file is part of the bms (Brewery Management System) |
|
11 * |
|
12 * This is free software; you can redistribute it and/or modify it |
|
13 * under the terms of the GNU General Public License as published by the |
|
14 * Free Software Foundation; either version 2, or (at your option) any |
|
15 * later version. |
|
16 * |
|
17 * bms is distributed in the hope that it will be useful, but |
|
18 * WITHOUT ANY WARRANTY; without even the implied warranty of |
|
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
20 * General Public License for more details. |
|
21 * |
|
22 * You should have received a copy of the GNU General Public License |
|
23 * along with ThermFerm; see the file COPYING. If not, write to the Free |
|
24 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. |
|
25 */ |
|
26 |
|
27 #include "bms.h" |
|
28 #include "xutil.h" |
|
29 #include "websocket.h" |
|
30 #include <libwebsockets.h> |
|
31 |
|
32 |
|
33 extern sys_config Config; |
|
34 extern int debug; |
|
35 extern int my_shutdown; |
|
36 |
|
37 struct lws_context *context; |
|
38 int ws_clients = 0; |
|
39 time_t last_msg = 0; |
|
40 pthread_mutex_t ws_mutex; |
|
41 |
|
42 |
|
43 /* |
|
44 * Based on lws-mirror-protocol from libwebsockets v2.0.x |
|
45 * Debian ships v2.0.3, on Slackware we have 2.4.0 and there |
|
46 * are lots of changes in the api. |
|
47 */ |
|
48 #define MAX_MESSAGE_QUEUE 512 |
|
49 |
|
50 /* |
|
51 * one of these created for each message |
|
52 */ |
|
53 struct a_message { |
|
54 void *payload; /* is malloc'd */ |
|
55 size_t len; |
|
56 }; |
|
57 |
|
58 |
|
59 static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; |
|
60 static int ringbuffer_head; |
|
61 |
|
62 |
|
63 |
|
64 static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) |
|
65 { |
|
66 struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; |
|
67 int n, m; |
|
68 |
|
69 switch (reason) { |
|
70 |
|
71 case LWS_CALLBACK_ESTABLISHED: { |
|
72 ws_clients++; |
|
73 syslog(LOG_NOTICE, "ws: new connection, total %d", ws_clients); |
|
74 pss->ringbuffer_tail = ringbuffer_head; |
|
75 pss->wsi = wsi; |
|
76 break; |
|
77 } |
|
78 |
|
79 case LWS_CALLBACK_PROTOCOL_DESTROY: |
|
80 syslog(LOG_NOTICE, "ws: protocol cleaning up"); |
|
81 for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++) |
|
82 if (ringbuffer[n].payload) |
|
83 free(ringbuffer[n].payload); |
|
84 break; |
|
85 |
|
86 case LWS_CALLBACK_SERVER_WRITEABLE: |
|
87 while (pss->ringbuffer_tail != ringbuffer_head) { |
|
88 m = ringbuffer[pss->ringbuffer_tail].len; |
|
89 n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT); |
|
90 if (n < 0) { |
|
91 syslog(LOG_NOTICE, "ws: ERROR %d writing", n); |
|
92 return -1; |
|
93 } |
|
94 if (n < m) |
|
95 syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m); |
|
96 |
|
97 if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) |
|
98 pss->ringbuffer_tail = 0; |
|
99 else |
|
100 pss->ringbuffer_tail++; |
|
101 |
|
102 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) |
|
103 lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); |
|
104 |
|
105 if (lws_send_pipe_choked(wsi)) { |
|
106 lws_callback_on_writable(wsi); |
|
107 break; |
|
108 } |
|
109 } |
|
110 break; |
|
111 |
|
112 case LWS_CALLBACK_RECEIVE: |
|
113 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) { |
|
114 syslog(LOG_NOTICE, "ws: dropping!"); |
|
115 goto choke; |
|
116 } |
|
117 |
|
118 if (ringbuffer[ringbuffer_head].payload) |
|
119 free(ringbuffer[ringbuffer_head].payload); |
|
120 |
|
121 ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); |
|
122 ringbuffer[ringbuffer_head].len = len; |
|
123 memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, in, len); |
|
124 if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) |
|
125 ringbuffer_head = 0; |
|
126 else |
|
127 ringbuffer_head++; |
|
128 |
|
129 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2)) |
|
130 goto done; |
|
131 |
|
132 choke: |
|
133 syslog(LOG_NOTICE, "ws: LWS_CALLBACK_RECEIVE: throttling"); |
|
134 lws_rx_flow_control(wsi, 0); |
|
135 |
|
136 done: |
|
137 lws_callback_on_writable_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); |
|
138 break; |
|
139 |
|
140 case LWS_CALLBACK_CLOSED: |
|
141 ws_clients--; |
|
142 syslog(LOG_NOTICE, "ws: connection closed, left %d", ws_clients); |
|
143 break; |
|
144 |
|
145 default: |
|
146 break; |
|
147 } |
|
148 |
|
149 return 0; |
|
150 } |
|
151 |
|
152 |
|
153 |
|
154 static struct lws_protocols protocols[] = { |
|
155 { "bmsd-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), 128 }, |
|
156 { NULL, NULL, 0, 0 } /* terminator */ |
|
157 }; |
|
158 |
|
159 |
|
160 /* |
|
161 * {"device":"fermenter","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"} |
|
162 */ |
|
163 void ws_broadcast(char *msg) |
|
164 { |
|
165 int len, err; |
|
166 |
|
167 err = pthread_mutex_lock(&ws_mutex); |
|
168 if (err) { |
|
169 syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err); |
|
170 } else { |
|
171 |
|
172 len = strlen(msg); |
|
173 if (ringbuffer[ringbuffer_head].payload) |
|
174 free(ringbuffer[ringbuffer_head].payload); |
|
175 |
|
176 ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); |
|
177 ringbuffer[ringbuffer_head].len = len; |
|
178 memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len); |
|
179 if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) |
|
180 ringbuffer_head = 0; |
|
181 else |
|
182 ringbuffer_head++; |
|
183 |
|
184 // syslog(LOG_NOTICE, "ws: %d %s", ringbuffer_head, msg); |
|
185 syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len); |
|
186 |
|
187 lws_callback_on_writable_all_protocol(context, &protocols[0]); |
|
188 err = pthread_mutex_unlock(&ws_mutex); |
|
189 if (err) { |
|
190 syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err); |
|
191 } |
|
192 last_msg = time(NULL); |
|
193 } |
|
194 } |
|
195 |
|
196 |
|
197 |
|
198 /* |
|
199 * Called every 5 seconds. |
|
200 */ |
|
201 void ws_check(void) |
|
202 { |
|
203 time_t now = time(NULL); |
|
204 |
|
205 if (((int)now - (int)last_msg) > 45) { |
|
206 ws_broadcast((char *)"{\"ping\":1}"); |
|
207 } |
|
208 } |
|
209 |
|
210 |
|
211 |
|
212 void *ws_loop(void *threadid) |
|
213 { |
|
214 struct lws_context_creation_info info; |
|
215 int n = 0; |
|
216 |
|
217 syslog(LOG_NOTICE, "Thread ws_loop started"); |
|
218 |
|
219 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ |
|
220 info.port = 8010; |
|
221 info.protocols = protocols; |
|
222 info.gid = -1; |
|
223 info.uid = -1; |
|
224 info.keepalive_timeout = 900; |
|
225 info.options = LWS_SERVER_OPTION_VALIDATE_UTF8; |
|
226 |
|
227 context = lws_create_context(&info); |
|
228 |
|
229 if (context == NULL) { |
|
230 syslog(LOG_NOTICE, "libwebsocket_create_context() failed"); |
|
231 } |
|
232 |
|
233 /* |
|
234 * Loop forever until external shutdown variable is set. |
|
235 */ |
|
236 while (n >= 0 && ! my_shutdown) { |
|
237 |
|
238 n = lws_service(context, 50); |
|
239 } |
|
240 lws_context_destroy(context); |
|
241 |
|
242 syslog(LOG_NOTICE, "Thread ws_loop stopped"); |
|
243 return 0; |
|
244 } |
|
245 |
|
246 |