Thu, 30 Jan 2020 15:29:43 +0100
Don't include carbonation sugars in the fermentables.
0 | 1 | /***************************************************************************** |
578
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
2 | * Copyright (C) 2017-2020 |
0 | 3 | * |
4 | * Michiel Broek <mbroek at mbse dot eu> | |
5 | * | |
6 | * This file is part of the bms (Brewery Management System) | |
7 | * | |
8 | * This is free software; you can redistribute it and/or modify it | |
9 | * under the terms of the GNU General Public License as published by the | |
10 | * Free Software Foundation; either version 2, or (at your option) any | |
11 | * later version. | |
12 | * | |
13 | * bms is distributed in the hope that it will be useful, but | |
14 | * WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | * General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with ThermFerm; see the file COPYING. If not, write to the Free | |
20 | * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. | |
21 | *****************************************************************************/ | |
22 | ||
23 | #include "bms.h" | |
24 | #include "xutil.h" | |
25 | #include "mqtt.h" | |
26 | #include "nodes.h" | |
27 | #include "fermenters.h" | |
502
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
28 | #include "co2meters.h" |
567
6bf0afc33e70
Initial code for iSpindel support in the daemon
Michiel Broek <mbroek@mbse.eu>
parents:
502
diff
changeset
|
29 | #include "ispindels.h" |
0 | 30 | |
31 | ||
32 | extern sys_config Config; | |
33 | extern int debug; | |
34 | ||
35 | ||
36 | ||
37 | /* Global variables for use in callbacks. */ | |
38 | int mqtt_qos = 0; | |
39 | int mqtt_status = STATUS_CONNECTING; | |
40 | int mqtt_mid_sent = 0; | |
41 | int mqtt_last_mid = -1; | |
42 | int mqtt_last_mid_sent = -1; | |
43 | int mqtt_connected = TRUE; | |
44 | int mqtt_disconnect_sent = FALSE; | |
45 | int mqtt_connect_lost = FALSE; | |
46 | int mqtt_my_shutdown = FALSE; | |
47 | int mqtt_use = FALSE; | |
48 | int keepalive = 60; | |
49 | unsigned int max_inflight = 20; | |
50 | struct mosquitto *mosq = NULL; | |
51 | char *state = NULL; | |
52 | char my_hostname[256]; | |
53 | int Sequence = 0; | |
54 | ||
55 | ||
56 | char *payload_header(void) | |
57 | { | |
58 | static char *tmp; | |
59 | char buf[128]; | |
60 | ||
61 | tmp = xstrcpy((char *)"{\"timestamp\":"); | |
62 | sprintf(buf, "%ld", time(NULL)); | |
63 | tmp = xstrcat(tmp, buf); | |
64 | tmp = xstrcat(tmp, (char *)",\"seq\":"); | |
65 | sprintf(buf, "%d", Sequence++); | |
66 | tmp = xstrcat(tmp, buf); | |
67 | tmp = xstrcat(tmp, (char *)",\"metric\":"); | |
68 | return tmp; | |
69 | } | |
70 | ||
71 | ||
72 | ||
73 | char *topic_base(char *msgtype) | |
74 | { | |
75 | static char *tmp; | |
76 | ||
77 | tmp = xstrcpy((char *)"mbv1.0/brewery/"); | |
78 | tmp = xstrcat(tmp, msgtype); | |
79 | tmp = xstrcat(tmp, (char *)"/"); | |
80 | tmp = xstrcat(tmp, my_hostname); | |
81 | return tmp; | |
82 | } | |
83 | ||
84 | ||
85 | ||
86 | void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) | |
87 | { | |
88 | char *topic = NULL; | |
89 | ||
90 | if (mqtt_connect_lost) { | |
91 | mqtt_connect_lost = FALSE; | |
92 | syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); | |
93 | } | |
94 | ||
95 | if (!result) { | |
96 | topic = topic_base((char *)"NCMD"); // TODO: do we need this?? | |
97 | topic = xstrcat(topic, (char *)"/#"); | |
98 | mosquitto_subscribe(mosq, NULL, topic, 0); | |
99 | free(topic); | |
501
9c41e865144a
Accept NBIRTH messages from co2meter modules.
Michiel Broek <mbroek@mbse.eu>
parents:
75
diff
changeset
|
100 | topic = xstrcpy((char *)"mbv1.0/fermenters/#"); // Subscribe to fermenter messages. |
0 | 101 | mosquitto_subscribe(mosq, NULL, topic, 0); |
102 | free(topic); | |
502
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
103 | topic = xstrcpy((char *)"mbv1.0/co2meters/#"); // Subscribe to co2meter messages. |
75
1a3c6480e057
Added support for brewcontrol nodes
Michiel Broek <mbroek@mbse.eu>
parents:
0
diff
changeset
|
104 | mosquitto_subscribe(mosq, NULL, topic, 0); |
1a3c6480e057
Added support for brewcontrol nodes
Michiel Broek <mbroek@mbse.eu>
parents:
0
diff
changeset
|
105 | free(topic); |
578
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
106 | topic = xstrcpy((char *)"mbv1.0/ispindels/#"); // Subscribe to ispindel messages. |
572
7a03181d29a3
Version 0.3.27 More code for iSpindels. All online/offline data in the database is now handles as integers. Nodes timeout use the configured interval time from the nodes.
Michiel Broek <mbroek@mbse.eu>
parents:
567
diff
changeset
|
107 | mosquitto_subscribe(mosq, NULL, topic, 0); |
7a03181d29a3
Version 0.3.27 More code for iSpindels. All online/offline data in the database is now handles as integers. Nodes timeout use the configured interval time from the nodes.
Michiel Broek <mbroek@mbse.eu>
parents:
567
diff
changeset
|
108 | free(topic); |
0 | 109 | topic = NULL; |
110 | mqtt_status = STATUS_CONNACK_RECVD; | |
111 | } else { | |
112 | syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); | |
113 | } | |
114 | } | |
115 | ||
116 | ||
117 | ||
118 | void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) | |
119 | { | |
120 | if (mqtt_my_shutdown) { | |
121 | syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); | |
122 | mqtt_connected = FALSE; | |
123 | } else { | |
124 | /* | |
125 | * The remote server was brought down. We must keep running | |
126 | */ | |
127 | syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); | |
128 | mqtt_connect_lost = TRUE; | |
129 | } | |
130 | } | |
131 | ||
132 | ||
133 | ||
134 | void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) | |
135 | { | |
136 | mqtt_last_mid_sent = mid; | |
137 | } | |
138 | ||
139 | ||
140 | ||
141 | void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) | |
142 | { | |
143 | int i; | |
144 | ||
145 | syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); | |
146 | for (i = 1; i < qos_count; i++) { | |
147 | syslog(LOG_NOTICE, " %d", granted_qos[i]); | |
148 | } | |
149 | } | |
150 | ||
151 | ||
152 | ||
153 | void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) | |
154 | { | |
155 | syslog(LOG_NOTICE, "MQTT: %s", str); | |
156 | if (debug) | |
157 | fprintf(stdout, "MQTT: %s\n", str); | |
158 | } | |
159 | ||
160 | ||
161 | ||
162 | void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) | |
163 | { | |
164 | if (message->payloadlen) { | |
165 | // TODO: process subscribed topics here. | |
166 | if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) { | |
167 | node_birth_data(message->topic, (char *)message->payload); | |
168 | return; | |
169 | } | |
170 | if (strstr(message->topic, (char *)"fermenters") && (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) { | |
171 | fermenter_birth_data(message->topic, (char *)message->payload); | |
172 | return; | |
173 | } | |
174 | if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) { | |
175 | fermenter_log(message->topic, (char *)message->payload); | |
176 | return; | |
177 | } | |
590
a43b8b85d8b3
Supress DCMD error messages in the log.
Michiel Broek <mbroek@mbse.eu>
parents:
578
diff
changeset
|
178 | if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DCMD")) { |
a43b8b85d8b3
Supress DCMD error messages in the log.
Michiel Broek <mbroek@mbse.eu>
parents:
578
diff
changeset
|
179 | return; // just ignore our own commands. |
a43b8b85d8b3
Supress DCMD error messages in the log.
Michiel Broek <mbroek@mbse.eu>
parents:
578
diff
changeset
|
180 | } |
502
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
181 | if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DBIRTH")) { |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
182 | co2meter_birth_data(message->topic, (char *)message->payload); |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
183 | return; |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
184 | } |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
185 | if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DLOG")) { |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
186 | co2meter_log(message->topic, (char *)message->payload); |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
187 | return; |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
188 | } |
578
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
189 | if (strstr(message->topic, (char *)"ispindels") && strstr(message->topic, (char *)"DBIRTH")) { |
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
190 | ispindel_birth_data(message->topic, (char *)message->payload); |
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
191 | return; |
567
6bf0afc33e70
Initial code for iSpindel support in the daemon
Michiel Broek <mbroek@mbse.eu>
parents:
502
diff
changeset
|
192 | } |
0 | 193 | syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); |
194 | } else { | |
195 | if (strstr(message->topic, (char *)"NBIRTH")) { | |
196 | // Ignore ?? | |
197 | fprintf(stdout, "MQTT: %s NULL\n", message->topic); | |
198 | return; | |
199 | } | |
200 | if (strstr(message->topic, (char *)"NDEATH")) { | |
201 | node_death(message->topic); | |
202 | return; | |
203 | } | |
204 | if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) { | |
205 | fermenter_death(message->topic); | |
206 | return; | |
207 | } | |
208 | syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); | |
209 | } | |
210 | } | |
211 | ||
212 | ||
213 | ||
214 | void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) | |
215 | { | |
216 | // publish the data | |
217 | if (payload) | |
218 | mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); | |
219 | else | |
220 | mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); | |
221 | } | |
222 | ||
223 | ||
224 | ||
225 | void publishNData(bool birth, int flag) | |
226 | { | |
227 | char *topic = NULL, *payload = NULL; | |
228 | struct utsname ubuf; | |
229 | bool comma = false; | |
230 | ||
231 | payload = payload_header(); | |
232 | payload = xstrcat(payload, (char *)"{"); | |
233 | ||
234 | if (birth || flag & MQTT_NODE_CONTROL) { | |
235 | payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); | |
236 | comma = true; | |
237 | } | |
238 | ||
239 | if (birth) { | |
240 | if (comma) | |
241 | payload = xstrcat(payload, (char *)","); | |
242 | payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); | |
243 | if (uname(&ubuf) == 0) { | |
244 | payload = xstrcat(payload, (char *)",\"os\":\""); | |
245 | payload = xstrcat(payload, ubuf.sysname); | |
246 | payload = xstrcat(payload, (char *)"\",\"os_version\":\""); | |
247 | payload = xstrcat(payload, ubuf.release); | |
248 | payload = xstrcat(payload, (char *)"\""); | |
249 | } else { | |
250 | payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); | |
251 | } | |
252 | ||
253 | payload = xstrcat(payload, (char *)",\"FW\":\""); | |
254 | payload = xstrcat(payload, (char *)VERSION); | |
255 | payload = xstrcat(payload, (char *)"\"}"); | |
256 | comma = true; | |
257 | } | |
258 | payload = xstrcat(payload, (char *)"}}"); | |
259 | ||
260 | if (birth) { | |
261 | topic = topic_base((char *)"NBIRTH"); | |
262 | publisher(mosq, topic, payload, true); | |
263 | } else { | |
264 | topic = topic_base((char *)"NDATA"); | |
265 | publisher(mosq, topic, payload, false); | |
266 | } | |
267 | ||
268 | free(payload); | |
269 | payload = NULL; | |
270 | free(topic); | |
271 | topic = NULL; | |
272 | } | |
273 | ||
274 | ||
275 | ||
276 | int mqtt_connect(void) | |
277 | { | |
278 | char *id = NULL, *topic = NULL; | |
279 | char err[1024]; | |
280 | int rc; | |
281 | ||
282 | /* | |
283 | * Initialize mosquitto communication | |
284 | */ | |
285 | gethostname(my_hostname, 255); | |
286 | mosquitto_lib_init(); | |
287 | id = xstrcpy((char *)"bmsd/"); | |
288 | id = xstrcat(id, my_hostname); | |
289 | if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { | |
290 | /* | |
291 | * Enforce maximum client id length of 23 characters | |
292 | */ | |
293 | id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; | |
294 | } | |
295 | ||
296 | mosq = mosquitto_new(id, TRUE, NULL); | |
297 | if (!mosq) { | |
298 | switch(errno) { | |
299 | case ENOMEM: | |
300 | syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); | |
301 | break; | |
302 | case EINVAL: | |
303 | syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); | |
304 | break; | |
305 | } | |
306 | mosquitto_lib_cleanup(); | |
307 | return 1; | |
308 | } | |
309 | free(id); | |
310 | id = NULL; | |
311 | ||
312 | /* | |
313 | * Set our will | |
314 | */ | |
315 | topic = topic_base((char *)"NDEATH"); | |
316 | if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) { | |
317 | if (rc > MOSQ_ERR_SUCCESS) | |
318 | syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); | |
319 | mosquitto_lib_cleanup(); | |
320 | return 2; | |
321 | } | |
322 | free(topic); | |
323 | topic = NULL; | |
324 | ||
325 | if (debug) | |
326 | mosquitto_log_callback_set(mosq, my_log_callback); | |
327 | ||
328 | /* | |
329 | * Username/Password | |
330 | */ | |
331 | if (Config.mqtt_user) { | |
332 | if (Config.mqtt_pass) { | |
333 | rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass); | |
334 | } else { | |
335 | rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL); | |
336 | } | |
337 | if (rc == MOSQ_ERR_INVAL) { | |
338 | syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid"); | |
339 | } else if (rc == MOSQ_ERR_NOMEM) { | |
340 | syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory"); | |
341 | } | |
342 | if (rc != MOSQ_ERR_SUCCESS) { | |
343 | mosquitto_lib_cleanup(); | |
344 | return 3; | |
345 | } | |
346 | } | |
347 | ||
348 | mosquitto_max_inflight_messages_set(mosq, max_inflight); | |
349 | mosquitto_connect_callback_set(mosq, my_connect_callback); | |
350 | mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); | |
351 | mosquitto_publish_callback_set(mosq, my_publish_callback); | |
352 | mosquitto_message_callback_set(mosq, my_message_callback); | |
353 | mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); | |
354 | ||
355 | if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { | |
356 | if (rc == MOSQ_ERR_ERRNO) { | |
357 | strerror_r(errno, err, 1024); | |
358 | syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); | |
359 | } else { | |
360 | syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); | |
361 | } | |
362 | mosquitto_lib_cleanup(); | |
363 | syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); | |
364 | return 4; | |
365 | } else { | |
366 | mqtt_use = TRUE; | |
367 | syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); | |
368 | ||
369 | /* | |
370 | * Initialise is complete, report our presence state | |
371 | */ | |
372 | mosquitto_loop_start(mosq); | |
373 | publishNData(true, 0); | |
374 | } | |
375 | ||
376 | return 0; | |
377 | } | |
378 | ||
379 | ||
380 | ||
381 | void mqtt_disconnect(void) | |
382 | { | |
383 | int rc; | |
384 | char *topic = NULL; | |
385 | ||
386 | if (mqtt_use) { | |
387 | /* | |
388 | * Final publish 0 to clients/<hostname>/bmsd/state | |
389 | * After that, remove the retained topic. | |
390 | */ | |
391 | syslog(LOG_NOTICE, "MQTT disconnecting"); | |
392 | topic = topic_base((char *)"NBIRTH"); | |
393 | publisher(mosq, topic, NULL, true); | |
394 | free(topic); | |
395 | topic = topic_base((char *)"NDEATH"); | |
396 | publisher(mosq, topic, NULL, true); | |
397 | free(topic); | |
398 | topic = NULL; | |
399 | mqtt_last_mid = mqtt_mid_sent; | |
400 | mqtt_status = STATUS_WAITING; | |
401 | mqtt_my_shutdown = TRUE; | |
402 | ||
403 | do { | |
404 | if (mqtt_status == STATUS_WAITING) { | |
405 | if (debug) | |
406 | fprintf(stdout, (char *)"Waiting\n"); | |
407 | if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) { | |
408 | mosquitto_disconnect(mosq); | |
409 | mqtt_disconnect_sent = TRUE; | |
410 | } | |
411 | usleep(100000); | |
412 | } | |
413 | rc = MOSQ_ERR_SUCCESS; | |
414 | } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); | |
415 | ||
416 | mosquitto_loop_stop(mosq, FALSE); | |
417 | mosquitto_destroy(mosq); | |
418 | mosquitto_lib_cleanup(); | |
419 | mqtt_use = FALSE; | |
420 | mqtt_status = STATUS_CONNECTING; | |
421 | mqtt_mid_sent = 0; | |
422 | mqtt_last_mid = -1; | |
423 | mqtt_last_mid_sent = -1; | |
424 | mqtt_connected = TRUE; | |
425 | mqtt_disconnect_sent = FALSE; | |
426 | mqtt_connect_lost = FALSE; | |
427 | mqtt_my_shutdown = FALSE; | |
428 | syslog(LOG_NOTICE, "MQTT: disconnected"); | |
429 | } | |
430 | } | |
431 | ||
432 |