thermferm/websocket.c

changeset 675
825210ba2707
child 676
09b5efe0c633
equal deleted inserted replaced
674:6cabc02f4c8d 675:825210ba2707
1 /**
2 * @file websocket.c
3 * @brief WebSockets interface
4 * @author Michiel Broek <mbroek at mbse dot eu>
5 *
6 * Copyright (C) 2024
7 *
8 * Michiel Broek <mbroek at mbse dot eu>
9 *
10 * This file is part of the mbsePi-apps
11 *
12 * This is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2, or (at your option) any
15 * later version.
16 *
17 * bms is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 * General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with ThermFerm; see the file COPYING. If not, write to the Free
24 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
25 */
26
27 #include "thermferm.h"
28 #include "xutil.h"
29 #include "websocket.h"
30 #include <libwebsockets.h>
31
32
33 extern sys_config Config;
34 extern int debug;
35
36 int my_ws_shutdown = 0;
37 int my_ws_state = 0;
38 struct lws_context *context;
39 int ws_clients = 0;
40 time_t last_msg = 0;
41 pthread_mutex_t ws_mutex;
42
43
44 /*
45 * Based on lws-mirror-protocol
46 */
47 #define MAX_MESSAGE_QUEUE 512
48
49 #define WS_INBUF 2048
50
51
52 /*
53 * one of these created for each message
54 */
55 struct a_message {
56 void *payload; /* is malloc'd */
57 size_t len;
58 };
59
60
61 static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
62 static int ringbuffer_head;
63
64
65
66 static int callback_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
67 {
68 struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user;
69 int n, m;
70 char buf[WS_INBUF + 1];
71
72 switch (reason) {
73
74 case LWS_CALLBACK_ESTABLISHED: {
75 ws_clients++;
76 pss->ringbuffer_tail = ringbuffer_head;
77 pss->wsi = wsi;
78 break;
79 }
80
81 case LWS_CALLBACK_PROTOCOL_DESTROY:
82 syslog(LOG_NOTICE, "Websocket: protocol cleaning up");
83 for (n = 0; n < sizeof ringbuffer / sizeof ringbuffer[0]; n++)
84 if (ringbuffer[n].payload)
85 free(ringbuffer[n].payload);
86 break;
87
88 case LWS_CALLBACK_SERVER_WRITEABLE:
89 while (pss->ringbuffer_tail != ringbuffer_head) {
90 m = ringbuffer[pss->ringbuffer_tail].len;
91 n = lws_write(wsi, (unsigned char *)ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT);
92 if (n < 0) {
93 syslog(LOG_NOTICE, "ws: ERROR %d writing", n);
94 return -1;
95 }
96 if (n < m)
97 syslog(LOG_NOTICE, "ws: partial write %d vs %d", n, m);
98
99 if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
100 pss->ringbuffer_tail = 0;
101 else
102 pss->ringbuffer_tail++;
103
104 if (((ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
105 lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi));
106
107 if (lws_send_pipe_choked(wsi)) {
108 lws_callback_on_writable(wsi);
109 break;
110 }
111 }
112 break;
113
114 case LWS_CALLBACK_RECEIVE:
115
116 memcpy(buf, in, len);
117 buf[len] = '\0';
118 // syslog(LOG_NOTICE, "ws: reveived %ld bytes %s", len, buf);
119 /*
120 * These are send by bmsapp to bmsd. Then bmsd resends these via MQTT.
121 * Do we want to change that? Or use it for the new web pages.
122 * {"node":"rpi01","group_id":"fermenters","control":"reboot"}
123 * {"node":"rpi01","group_id":"fermenters","control":"rebirth"}
124 */
125 // if (strncmp(buf, (char *)"{\"device\":\"fermenters\",", 23) == 0) {
126 // fermenter_ws_receive(buf);
127 // } else if (strncmp(buf, (char *)"{\"device\":\"co2meters\",", 22) == 0) {
128 // co2meter_ws_receive(buf);
129 // } else if (strncmp(buf, (char *)"{\"device\":\"ispindels\",", 22) == 0) {
130 // ispindel_ws_receive(buf);
131 // } else if (strncmp(buf, (char *)"{\"node\":\"", 9) == 0) {
132 // node_ws_receive(buf);
133 // }
134
135 break;
136
137 case LWS_CALLBACK_CLOSED:
138 ws_clients--;
139 break;
140
141 default:
142 break;
143 }
144
145 return 0;
146 }
147
148
149
150 static struct lws_protocols protocols[] = {
151 { "thermferm-protocol", callback_ws, sizeof(struct per_session_data__lws_mirror), WS_INBUF },
152 { NULL, NULL, 0, 0 } /* terminator */
153 };
154
155
156 /*
157 * {"node":"host","group":"group","online":1,"lastseen":"datetime","temperature":20.5,"humidity":47,"ip":"ipaddr","rssi":-1}
158 * {"device":"fermenters","node":"seaport","unit":"unit0","online":1,"mode":"FRIDGE","yeast_lo":12.0,"yeast_hi":24.0,"air":19.875,"beer":19.812,"chiller":1.500,"heater":100,"cooler":0,"fan":100,"light":0,"door":0,"sp_lo":17.0,"sp_hi":17.5,"alarm":0,"stage":"PRIMARY"}
159 */
160 void ws_broadcast(char *msg)
161 {
162 int len, err;
163
164 syslog(LOG_NOTICE, "%s", msg);
165 err = pthread_mutex_lock(&ws_mutex);
166 if (err) {
167 syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_lock error %d", err);
168 } else {
169
170 len = strlen(msg);
171 if (ringbuffer[ringbuffer_head].payload)
172 free(ringbuffer[ringbuffer_head].payload);
173
174 ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
175 ringbuffer[ringbuffer_head].len = len;
176 memcpy((char *)ringbuffer[ringbuffer_head].payload + LWS_PRE, msg, len);
177 if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
178 ringbuffer_head = 0;
179 else
180 ringbuffer_head++;
181
182 // syslog(LOG_NOTICE, "ws: broadcast buffer=%d len=%d", ringbuffer_head, len);
183
184 lws_callback_on_writable_all_protocol(context, &protocols[0]);
185 err = pthread_mutex_unlock(&ws_mutex);
186 if (err) {
187 syslog(LOG_NOTICE, "ws_broadcast pthread_mutex_unlock error %d", err);
188 }
189 last_msg = time(NULL);
190 }
191 }
192
193
194
195 /*
196 * Called every 5 seconds.
197 */
198 void ws_check(void)
199 {
200 time_t now = time(NULL);
201
202 if (((int)now - (int)last_msg) > 45) {
203 ws_broadcast((char *)"{\"ping\":1}");
204 }
205 }
206
207
208
209 void *my_ws_loop(void *threadid)
210 {
211 struct lws_context_creation_info info;
212 int n = 0;
213
214 my_ws_state = 1;
215 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
216 info.port = Config.websocket_port;
217 info.protocols = protocols;
218 info.gid = -1;
219 info.uid = -1;
220 info.keepalive_timeout = 900;
221 info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;
222
223 context = lws_create_context(&info);
224
225 if (context == NULL) {
226 syslog(LOG_NOTICE, "libwebsocket_create_context() failed");
227 my_ws_state = 0;
228 return (void *)1;
229 }
230 syslog(LOG_NOTICE, "Websocket: server started port %d", info.port);
231
232 /*
233 * Loop forever until external shutdown variable is set.
234 */
235 while (n >= 0 && ! my_ws_shutdown) {
236
237 n = lws_service(context, 50);
238 }
239 lws_context_destroy(context);
240
241 my_ws_state = 0;
242 syslog(LOG_NOTICE, "Websocket: server stopped");
243 return (void *)0;
244 }
245
246

mercurial