|
1 /** |
|
2 * @file websocket.c |
|
3 * @brief WebSockets interface |
|
4 * @author Michiel Broek <mbroek at mbse dot eu> |
|
5 * |
|
6 * Copyright (C) 2024 |
|
7 * |
|
8 * Michiel Broek <mbroek at mbse dot eu> |
|
9 * |
|
10 * This file is part of the mbsePi-apps |
|
11 * |
|
12 * This is free software; you can redistribute it and/or modify it |
|
13 * under the terms of the GNU General Public License as published by the |
|
14 * Free Software Foundation; either version 2, or (at your option) any |
|
15 * later version. |
|
16 * |
|
17 * bms is distributed in the hope that it will be useful, but |
|
18 * WITHOUT ANY WARRANTY; without even the implied warranty of |
|
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
20 * General Public License for more details. |
|
21 * |
|
22 * You should have received a copy of the GNU General Public License |
|
23 * along with ThermFerm; see the file COPYING. If not, write to the Free |
|
24 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. |
|
25 */ |
|
26 |
|
27 #include "thermferm.h" |
|
28 #include "xutil.h" |
|
29 #include "websocket.h" |
|
30 #include <libwebsockets.h> |
|
31 |
|
32 |
|
33 extern sys_config Config; |
|
34 extern int debug; |
|
35 |
|
36 int my_ws_shutdown = 0; |
|
37 int my_ws_state = 0; |
|
38 struct lws_context *context; |
|
39 int ws_clients = 0; |
|
40 time_t last_msg = 0; |
|
41 pthread_mutex_t ws_mutex; |
|
42 |
|
43 |
|
44 /* |
|
45 * Based on lws-mirror-protocol |
|
46 */ |
|
47 #define MAX_MESSAGE_QUEUE 512 |
|
48 |
|
49 #define WS_INBUF 2048 |
|
50 |
|
51 |
|
52 /* |
|
53 * one of these created for each message |
|
54 */ |
|
55 struct a_message { |
|
56 void *payload; /* is malloc'd */ |
|
57 size_t len; |
|
58 }; |
|
59 |
|
60 |
|
61 static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; |
|
62 static int ringbuffer_head; |
|
63 |
|
64 |
|
65 |
|
66 static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) |
|
67 { |
|
68 struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; |
|
69 int n, m; |
|
70 char buf[WS_INBUF + 1]; |
|
71 |
|
72 switch (reason) { |
|
73 |
|
74 case LWS_CALLBACK_ESTABLISHED: { |
|
75 ws_clients++; |
|
76 pss->ringbuffer_tail = ringbuffer_head; |
|
77 pss->wsi = wsi; |
|
78 break; |
|
79 } |
|
80 |
|
81 case LWS_CALLBACK_PROTOCOL_DESTROY: |
|
82 syslog(LOG_NOTICE, "Websocket: protocol cleaning up"); |
|
83 for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++) |
|
84 if (ringbuffer[n].payload) |
|
85 free(ringbuffer[n].payload); |
|
86 break; |
|
87 |
|
88 case LWS_CALLBACK_SERVER_WRITEABLE: |
|
89 while (pss->ringbuffer_tail != ringbuffer_head) { |
|
90 m = ringbuffer[pss->ringbuffer_tail].len; |
|
91 n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT); |
|
92 if (n < 0) { |
|
93 syslog(LOG_NOTICE, "ws: ERROR %d writing", n); |
|
94 return -1; |
|
95 } |
|
96 if (n < m) |
|
97 syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m); |
|
98 |
|
99 if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) |
|
100 pss->ringbuffer_tail = 0; |
|
101 else |
|
102 pss->ringbuffer_tail++; |
|
103 |
|
104 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) |
|
105 lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); |
|
106 |
|
107 if (lws_send_pipe_choked(wsi)) { |
|
108 lws_callback_on_writable(wsi); |
|
109 break; |
|
110 } |
|
111 } |
|
112 break; |
|
113 |
|
114 case LWS_CALLBACK_RECEIVE: |
|
115 |
|
116 memcpy(buf, in, len); |
|
117 buf[len] = '\0'; |
|
118 // syslog(LOG_NOTICE, "ws: reveived %ld bytes %s", len, buf); |
|
119 /* |
|
120 * These are send by bmsapp to bmsd. Then bmsd resends these via MQTT. |
|
121 * Do we want to change that? Or use it for the new web pages. |
|
122 * {"node":"rpi01","group_id":"fermenters","control":"reboot"} |
|
123 * {"node":"rpi01","group_id":"fermenters","control":"rebirth"} |
|
124 */ |
|
125 // if (strncmp(buf, (char *)"{\"device\":\"fermenters\",", 23) == 0) { |
|
126 // fermenter_ws_receive(buf); |
|
127 // } else if (strncmp(buf, (char *)"{\"device\":\"co2meters\",", 22) == 0) { |
|
128 // co2meter_ws_receive(buf); |
|
129 // } else if (strncmp(buf, (char *)"{\"device\":\"ispindels\",", 22) == 0) { |
|
130 // ispindel_ws_receive(buf); |
|
131 // } else if (strncmp(buf, (char *)"{\"node\":\"", 9) == 0) { |
|
132 // node_ws_receive(buf); |
|
133 // } |
|
134 |
|
135 break; |
|
136 |
|
137 case LWS_CALLBACK_CLOSED: |
|
138 ws_clients--; |
|
139 break; |
|
140 |
|
141 default: |
|
142 break; |
|
143 } |
|
144 |
|
145 return 0; |
|
146 } |
|
147 |
|
148 |
|
149 |
|
150 static struct lws_protocols protocols[] = { |
|
151 { "thermferm-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), WS_INBUF }, |
|
152 { NULL, NULL, 0, 0 } /* terminator */ |
|
153 }; |
|
154 |
|
155 |
|
156 /* |
|
157 * {"node":"host","group":"group","online":1,"lastseen":"datetime","temperature":20.5,"humidity":47,"ip":"ipaddr","rssi":-1} |
|
158 * {"device":"fermenters","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"} |
|
159 */ |
|
160 void ws_broadcast(char *msg) |
|
161 { |
|
162 int len, err; |
|
163 |
|
164 syslog(LOG_NOTICE, "%s", msg); |
|
165 err = pthread_mutex_lock(&ws_mutex); |
|
166 if (err) { |
|
167 syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err); |
|
168 } else { |
|
169 |
|
170 len = strlen(msg); |
|
171 if (ringbuffer[ringbuffer_head].payload) |
|
172 free(ringbuffer[ringbuffer_head].payload); |
|
173 |
|
174 ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); |
|
175 ringbuffer[ringbuffer_head].len = len; |
|
176 memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len); |
|
177 if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) |
|
178 ringbuffer_head = 0; |
|
179 else |
|
180 ringbuffer_head++; |
|
181 |
|
182 // syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len); |
|
183 |
|
184 lws_callback_on_writable_all_protocol(context, &protocols[0]); |
|
185 err = pthread_mutex_unlock(&ws_mutex); |
|
186 if (err) { |
|
187 syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err); |
|
188 } |
|
189 last_msg = time(NULL); |
|
190 } |
|
191 } |
|
192 |
|
193 |
|
194 |
|
195 /* |
|
196 * Called every 5 seconds. |
|
197 */ |
|
198 void ws_check(void) |
|
199 { |
|
200 time_t now = time(NULL); |
|
201 |
|
202 if (((int)now - (int)last_msg) > 45) { |
|
203 ws_broadcast((char *)"{\"ping\":1}"); |
|
204 } |
|
205 } |
|
206 |
|
207 |
|
208 |
|
209 void *my_ws_loop(void *threadid) |
|
210 { |
|
211 struct lws_context_creation_info info; |
|
212 int n = 0; |
|
213 |
|
214 my_ws_state = 1; |
|
215 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ |
|
216 info.port = Config.websocket_port; |
|
217 info.protocols = protocols; |
|
218 info.gid = -1; |
|
219 info.uid = -1; |
|
220 info.keepalive_timeout = 900; |
|
221 info.options = LWS_SERVER_OPTION_VALIDATE_UTF8; |
|
222 |
|
223 context = lws_create_context(&info); |
|
224 |
|
225 if (context == NULL) { |
|
226 syslog(LOG_NOTICE, "libwebsocket_create_context() failed"); |
|
227 my_ws_state = 0; |
|
228 return (void *)1; |
|
229 } |
|
230 syslog(LOG_NOTICE, "Websocket: server started port %d", info.port); |
|
231 |
|
232 /* |
|
233 * Loop forever until external shutdown variable is set. |
|
234 */ |
|
235 while (n >= 0 && ! my_ws_shutdown) { |
|
236 |
|
237 n = lws_service(context, 50); |
|
238 } |
|
239 lws_context_destroy(context); |
|
240 |
|
241 my_ws_state = 0; |
|
242 syslog(LOG_NOTICE, "Websocket: server stopped"); |
|
243 return (void *)0; |
|
244 } |
|
245 |
|
246 |