Sun, 23 Dec 2018 20:13:36 +0100
Added some icons from Brewersfriend. They should be replaced someday. Added maximum mash weight setting to the equipment database. Usefull for brew automate and RIMS systems. During recipes import acid and base additions are translated. Brews and recipes now have 2 water sources. Added water mixer. Added basic water treatment, but not for pH yet. Redesigned the fermentables and water tabs.
0 | 1 | /***************************************************************************** |
2 | * Copyright (C) 2017-2018 | |
3 | * | |
4 | * Michiel Broek <mbroek at mbse dot eu> | |
5 | * | |
6 | * This file is part of the bms (Brewery Management System) | |
7 | * | |
8 | * This is free software; you can redistribute it and/or modify it | |
9 | * under the terms of the GNU General Public License as published by the | |
10 | * Free Software Foundation; either version 2, or (at your option) any | |
11 | * later version. | |
12 | * | |
13 | * bms is distributed in the hope that it will be useful, but | |
14 | * WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | * General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with ThermFerm; see the file COPYING. If not, write to the Free | |
20 | * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. | |
21 | *****************************************************************************/ | |
22 | ||
23 | #include "bms.h" | |
24 | #include "xutil.h" | |
25 | #include "mqtt.h" | |
26 | #include "nodes.h" | |
27 | #include "fermenters.h" | |
28 | ||
29 | ||
30 | extern sys_config Config; | |
31 | extern int debug; | |
32 | ||
33 | ||
34 | ||
35 | /* Global variables for use in callbacks. */ | |
36 | int mqtt_qos = 0; | |
37 | int mqtt_status = STATUS_CONNECTING; | |
38 | int mqtt_mid_sent = 0; | |
39 | int mqtt_last_mid = -1; | |
40 | int mqtt_last_mid_sent = -1; | |
41 | int mqtt_connected = TRUE; | |
42 | int mqtt_disconnect_sent = FALSE; | |
43 | int mqtt_connect_lost = FALSE; | |
44 | int mqtt_my_shutdown = FALSE; | |
45 | int mqtt_use = FALSE; | |
46 | int keepalive = 60; | |
47 | unsigned int max_inflight = 20; | |
48 | struct mosquitto *mosq = NULL; | |
49 | char *state = NULL; | |
50 | char my_hostname[256]; | |
51 | int Sequence = 0; | |
52 | ||
53 | ||
54 | char *payload_header(void) | |
55 | { | |
56 | static char *tmp; | |
57 | char buf[128]; | |
58 | ||
59 | tmp = xstrcpy((char *)"{\"timestamp\":"); | |
60 | sprintf(buf, "%ld", time(NULL)); | |
61 | tmp = xstrcat(tmp, buf); | |
62 | tmp = xstrcat(tmp, (char *)",\"seq\":"); | |
63 | sprintf(buf, "%d", Sequence++); | |
64 | tmp = xstrcat(tmp, buf); | |
65 | tmp = xstrcat(tmp, (char *)",\"metric\":"); | |
66 | return tmp; | |
67 | } | |
68 | ||
69 | ||
70 | ||
71 | char *topic_base(char *msgtype) | |
72 | { | |
73 | static char *tmp; | |
74 | ||
75 | tmp = xstrcpy((char *)"mbv1.0/brewery/"); | |
76 | tmp = xstrcat(tmp, msgtype); | |
77 | tmp = xstrcat(tmp, (char *)"/"); | |
78 | tmp = xstrcat(tmp, my_hostname); | |
79 | return tmp; | |
80 | } | |
81 | ||
82 | ||
83 | ||
84 | void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) | |
85 | { | |
86 | char *topic = NULL; | |
87 | ||
88 | if (mqtt_connect_lost) { | |
89 | mqtt_connect_lost = FALSE; | |
90 | syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); | |
91 | } | |
92 | ||
93 | if (!result) { | |
94 | topic = topic_base((char *)"NCMD"); // TODO: do we need this?? | |
95 | topic = xstrcat(topic, (char *)"/#"); | |
96 | mosquitto_subscribe(mosq, NULL, topic, 0); | |
97 | free(topic); | |
98 | topic = xstrcpy((char *)"mbv1.0/fermenters/#"); // Subscribe to fermenter messages. | |
99 | mosquitto_subscribe(mosq, NULL, topic, 0); | |
100 | free(topic); | |
75
1a3c6480e057
Added support for brewcontrol nodes
Michiel Broek <mbroek@mbse.eu>
parents:
0
diff
changeset
|
101 | topic = xstrcpy((char *)"mbv1.0/brewcontrol/#"); // Subscribe to brewcontrol messages. |
1a3c6480e057
Added support for brewcontrol nodes
Michiel Broek <mbroek@mbse.eu>
parents:
0
diff
changeset
|
102 | mosquitto_subscribe(mosq, NULL, topic, 0); |
1a3c6480e057
Added support for brewcontrol nodes
Michiel Broek <mbroek@mbse.eu>
parents:
0
diff
changeset
|
103 | free(topic); |
0 | 104 | topic = NULL; |
105 | mqtt_status = STATUS_CONNACK_RECVD; | |
106 | } else { | |
107 | syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); | |
108 | } | |
109 | } | |
110 | ||
111 | ||
112 | ||
113 | void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) | |
114 | { | |
115 | if (mqtt_my_shutdown) { | |
116 | syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); | |
117 | mqtt_connected = FALSE; | |
118 | } else { | |
119 | /* | |
120 | * The remote server was brought down. We must keep running | |
121 | */ | |
122 | syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); | |
123 | mqtt_connect_lost = TRUE; | |
124 | } | |
125 | } | |
126 | ||
127 | ||
128 | ||
129 | void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) | |
130 | { | |
131 | mqtt_last_mid_sent = mid; | |
132 | } | |
133 | ||
134 | ||
135 | ||
136 | void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) | |
137 | { | |
138 | int i; | |
139 | ||
140 | syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); | |
141 | for (i = 1; i < qos_count; i++) { | |
142 | syslog(LOG_NOTICE, " %d", granted_qos[i]); | |
143 | } | |
144 | } | |
145 | ||
146 | ||
147 | ||
148 | void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) | |
149 | { | |
150 | syslog(LOG_NOTICE, "MQTT: %s", str); | |
151 | if (debug) | |
152 | fprintf(stdout, "MQTT: %s\n", str); | |
153 | } | |
154 | ||
155 | ||
156 | ||
157 | void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) | |
158 | { | |
159 | if (message->payloadlen) { | |
160 | // TODO: process subscribed topics here. | |
161 | if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) { | |
162 | node_birth_data(message->topic, (char *)message->payload); | |
163 | return; | |
164 | } | |
165 | if (strstr(message->topic, (char *)"fermenters") && (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) { | |
166 | fermenter_birth_data(message->topic, (char *)message->payload); | |
167 | return; | |
168 | } | |
169 | if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) { | |
170 | fermenter_log(message->topic, (char *)message->payload); | |
171 | return; | |
172 | } | |
173 | syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); | |
174 | } else { | |
175 | if (strstr(message->topic, (char *)"NBIRTH")) { | |
176 | // Ignore ?? | |
177 | fprintf(stdout, "MQTT: %s NULL\n", message->topic); | |
178 | return; | |
179 | } | |
180 | if (strstr(message->topic, (char *)"NDEATH")) { | |
181 | node_death(message->topic); | |
182 | return; | |
183 | } | |
184 | if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) { | |
185 | fermenter_death(message->topic); | |
186 | return; | |
187 | } | |
188 | syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); | |
189 | } | |
190 | } | |
191 | ||
192 | ||
193 | ||
194 | void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) | |
195 | { | |
196 | // publish the data | |
197 | if (payload) | |
198 | mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); | |
199 | else | |
200 | mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); | |
201 | } | |
202 | ||
203 | ||
204 | ||
205 | void publishNData(bool birth, int flag) | |
206 | { | |
207 | char *topic = NULL, *payload = NULL; | |
208 | struct utsname ubuf; | |
209 | bool comma = false; | |
210 | ||
211 | payload = payload_header(); | |
212 | payload = xstrcat(payload, (char *)"{"); | |
213 | ||
214 | if (birth || flag & MQTT_NODE_CONTROL) { | |
215 | payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); | |
216 | comma = true; | |
217 | } | |
218 | ||
219 | if (birth) { | |
220 | if (comma) | |
221 | payload = xstrcat(payload, (char *)","); | |
222 | payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); | |
223 | if (uname(&ubuf) == 0) { | |
224 | payload = xstrcat(payload, (char *)",\"os\":\""); | |
225 | payload = xstrcat(payload, ubuf.sysname); | |
226 | payload = xstrcat(payload, (char *)"\",\"os_version\":\""); | |
227 | payload = xstrcat(payload, ubuf.release); | |
228 | payload = xstrcat(payload, (char *)"\""); | |
229 | } else { | |
230 | payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); | |
231 | } | |
232 | ||
233 | payload = xstrcat(payload, (char *)",\"FW\":\""); | |
234 | payload = xstrcat(payload, (char *)VERSION); | |
235 | payload = xstrcat(payload, (char *)"\"}"); | |
236 | comma = true; | |
237 | } | |
238 | payload = xstrcat(payload, (char *)"}}"); | |
239 | ||
240 | if (birth) { | |
241 | topic = topic_base((char *)"NBIRTH"); | |
242 | publisher(mosq, topic, payload, true); | |
243 | } else { | |
244 | topic = topic_base((char *)"NDATA"); | |
245 | publisher(mosq, topic, payload, false); | |
246 | } | |
247 | ||
248 | free(payload); | |
249 | payload = NULL; | |
250 | free(topic); | |
251 | topic = NULL; | |
252 | } | |
253 | ||
254 | ||
255 | ||
256 | int mqtt_connect(void) | |
257 | { | |
258 | char *id = NULL, *topic = NULL; | |
259 | char err[1024]; | |
260 | int rc; | |
261 | ||
262 | /* | |
263 | * Initialize mosquitto communication | |
264 | */ | |
265 | gethostname(my_hostname, 255); | |
266 | mosquitto_lib_init(); | |
267 | id = xstrcpy((char *)"bmsd/"); | |
268 | id = xstrcat(id, my_hostname); | |
269 | if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { | |
270 | /* | |
271 | * Enforce maximum client id length of 23 characters | |
272 | */ | |
273 | id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; | |
274 | } | |
275 | ||
276 | mosq = mosquitto_new(id, TRUE, NULL); | |
277 | if (!mosq) { | |
278 | switch(errno) { | |
279 | case ENOMEM: | |
280 | syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); | |
281 | break; | |
282 | case EINVAL: | |
283 | syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); | |
284 | break; | |
285 | } | |
286 | mosquitto_lib_cleanup(); | |
287 | return 1; | |
288 | } | |
289 | free(id); | |
290 | id = NULL; | |
291 | ||
292 | /* | |
293 | * Set our will | |
294 | */ | |
295 | topic = topic_base((char *)"NDEATH"); | |
296 | if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) { | |
297 | if (rc > MOSQ_ERR_SUCCESS) | |
298 | syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); | |
299 | mosquitto_lib_cleanup(); | |
300 | return 2; | |
301 | } | |
302 | free(topic); | |
303 | topic = NULL; | |
304 | ||
305 | if (debug) | |
306 | mosquitto_log_callback_set(mosq, my_log_callback); | |
307 | ||
308 | /* | |
309 | * Username/Password | |
310 | */ | |
311 | if (Config.mqtt_user) { | |
312 | if (Config.mqtt_pass) { | |
313 | rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass); | |
314 | } else { | |
315 | rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL); | |
316 | } | |
317 | if (rc == MOSQ_ERR_INVAL) { | |
318 | syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid"); | |
319 | } else if (rc == MOSQ_ERR_NOMEM) { | |
320 | syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory"); | |
321 | } | |
322 | if (rc != MOSQ_ERR_SUCCESS) { | |
323 | mosquitto_lib_cleanup(); | |
324 | return 3; | |
325 | } | |
326 | } | |
327 | ||
328 | mosquitto_max_inflight_messages_set(mosq, max_inflight); | |
329 | mosquitto_connect_callback_set(mosq, my_connect_callback); | |
330 | mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); | |
331 | mosquitto_publish_callback_set(mosq, my_publish_callback); | |
332 | mosquitto_message_callback_set(mosq, my_message_callback); | |
333 | mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); | |
334 | ||
335 | if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { | |
336 | if (rc == MOSQ_ERR_ERRNO) { | |
337 | strerror_r(errno, err, 1024); | |
338 | syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); | |
339 | } else { | |
340 | syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); | |
341 | } | |
342 | mosquitto_lib_cleanup(); | |
343 | syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); | |
344 | return 4; | |
345 | } else { | |
346 | mqtt_use = TRUE; | |
347 | syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); | |
348 | ||
349 | /* | |
350 | * Initialise is complete, report our presence state | |
351 | */ | |
352 | mosquitto_loop_start(mosq); | |
353 | publishNData(true, 0); | |
354 | } | |
355 | ||
356 | return 0; | |
357 | } | |
358 | ||
359 | ||
360 | ||
361 | void mqtt_disconnect(void) | |
362 | { | |
363 | int rc; | |
364 | char *topic = NULL; | |
365 | ||
366 | if (mqtt_use) { | |
367 | /* | |
368 | * Final publish 0 to clients/<hostname>/bmsd/state | |
369 | * After that, remove the retained topic. | |
370 | */ | |
371 | syslog(LOG_NOTICE, "MQTT disconnecting"); | |
372 | topic = topic_base((char *)"NBIRTH"); | |
373 | publisher(mosq, topic, NULL, true); | |
374 | free(topic); | |
375 | topic = topic_base((char *)"NDEATH"); | |
376 | publisher(mosq, topic, NULL, true); | |
377 | free(topic); | |
378 | topic = NULL; | |
379 | mqtt_last_mid = mqtt_mid_sent; | |
380 | mqtt_status = STATUS_WAITING; | |
381 | mqtt_my_shutdown = TRUE; | |
382 | ||
383 | do { | |
384 | if (mqtt_status == STATUS_WAITING) { | |
385 | if (debug) | |
386 | fprintf(stdout, (char *)"Waiting\n"); | |
387 | if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) { | |
388 | mosquitto_disconnect(mosq); | |
389 | mqtt_disconnect_sent = TRUE; | |
390 | } | |
391 | usleep(100000); | |
392 | } | |
393 | rc = MOSQ_ERR_SUCCESS; | |
394 | } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); | |
395 | ||
396 | mosquitto_loop_stop(mosq, FALSE); | |
397 | mosquitto_destroy(mosq); | |
398 | mosquitto_lib_cleanup(); | |
399 | mqtt_use = FALSE; | |
400 | mqtt_status = STATUS_CONNECTING; | |
401 | mqtt_mid_sent = 0; | |
402 | mqtt_last_mid = -1; | |
403 | mqtt_last_mid_sent = -1; | |
404 | mqtt_connected = TRUE; | |
405 | mqtt_disconnect_sent = FALSE; | |
406 | mqtt_connect_lost = FALSE; | |
407 | mqtt_my_shutdown = FALSE; | |
408 | syslog(LOG_NOTICE, "MQTT: disconnected"); | |
409 | } | |
410 | } | |
411 | ||
412 |