bmsd/mqtt.c

changeset 0
033898178630
child 75
1a3c6480e057
equal deleted inserted replaced
-1:000000000000 0:033898178630
1 /*****************************************************************************
2 * Copyright (C) 2017-2018
3 *
4 * Michiel Broek <mbroek at mbse dot eu>
5 *
6 * This file is part of the bms (Brewery Management System)
7 *
8 * This is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2, or (at your option) any
11 * later version.
12 *
13 * bms is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with ThermFerm; see the file COPYING. If not, write to the Free
20 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
21 *****************************************************************************/
22
23 #include "bms.h"
24 #include "xutil.h"
25 #include "mqtt.h"
26 #include "nodes.h"
27 #include "fermenters.h"
28
29
30 extern sys_config Config;
31 extern int debug;
32
33
34
35 /* Global variables for use in callbacks. */
36 int mqtt_qos = 0;
37 int mqtt_status = STATUS_CONNECTING;
38 int mqtt_mid_sent = 0;
39 int mqtt_last_mid = -1;
40 int mqtt_last_mid_sent = -1;
41 int mqtt_connected = TRUE;
42 int mqtt_disconnect_sent = FALSE;
43 int mqtt_connect_lost = FALSE;
44 int mqtt_my_shutdown = FALSE;
45 int mqtt_use = FALSE;
46 int keepalive = 60;
47 unsigned int max_inflight = 20;
48 struct mosquitto *mosq = NULL;
49 char *state = NULL;
50 char my_hostname[256];
51 int Sequence = 0;
52
53
54 char *payload_header(void)
55 {
56 static char *tmp;
57 char buf[128];
58
59 tmp = xstrcpy((char *)"{\"timestamp\":");
60 sprintf(buf, "%ld", time(NULL));
61 tmp = xstrcat(tmp, buf);
62 tmp = xstrcat(tmp, (char *)",\"seq\":");
63 sprintf(buf, "%d", Sequence++);
64 tmp = xstrcat(tmp, buf);
65 tmp = xstrcat(tmp, (char *)",\"metric\":");
66 return tmp;
67 }
68
69
70
71 char *topic_base(char *msgtype)
72 {
73 static char *tmp;
74
75 tmp = xstrcpy((char *)"mbv1.0/brewery/");
76 tmp = xstrcat(tmp, msgtype);
77 tmp = xstrcat(tmp, (char *)"/");
78 tmp = xstrcat(tmp, my_hostname);
79 return tmp;
80 }
81
82
83
84 void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
85 {
86 char *topic = NULL;
87
88 if (mqtt_connect_lost) {
89 mqtt_connect_lost = FALSE;
90 syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
91 }
92
93 if (!result) {
94 topic = topic_base((char *)"NCMD"); // TODO: do we need this??
95 topic = xstrcat(topic, (char *)"/#");
96 mosquitto_subscribe(mosq, NULL, topic, 0);
97 free(topic);
98 topic = xstrcpy((char *)"mbv1.0/fermenters/#"); // Subscribe to fermenter messages.
99 mosquitto_subscribe(mosq, NULL, topic, 0);
100 free(topic);
101 topic = NULL;
102 mqtt_status = STATUS_CONNACK_RECVD;
103 } else {
104 syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
105 }
106 }
107
108
109
110 void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc)
111 {
112 if (mqtt_my_shutdown) {
113 syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host);
114 mqtt_connected = FALSE;
115 } else {
116 /*
117 * The remote server was brought down. We must keep running
118 */
119 syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host);
120 mqtt_connect_lost = TRUE;
121 }
122 }
123
124
125
126 void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid)
127 {
128 mqtt_last_mid_sent = mid;
129 }
130
131
132
133 void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
134 {
135 int i;
136
137 syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
138 for (i = 1; i < qos_count; i++) {
139 syslog(LOG_NOTICE, " %d", granted_qos[i]);
140 }
141 }
142
143
144
145 void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
146 {
147 syslog(LOG_NOTICE, "MQTT: %s", str);
148 if (debug)
149 fprintf(stdout, "MQTT: %s\n", str);
150 }
151
152
153
154 void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
155 {
156 if (message->payloadlen) {
157 // TODO: process subscribed topics here.
158 if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) {
159 node_birth_data(message->topic, (char *)message->payload);
160 return;
161 }
162 if (strstr(message->topic, (char *)"fermenters") && (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) {
163 fermenter_birth_data(message->topic, (char *)message->payload);
164 return;
165 }
166 if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) {
167 fermenter_log(message->topic, (char *)message->payload);
168 return;
169 }
170 syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
171 } else {
172 if (strstr(message->topic, (char *)"NBIRTH")) {
173 // Ignore ??
174 fprintf(stdout, "MQTT: %s NULL\n", message->topic);
175 return;
176 }
177 if (strstr(message->topic, (char *)"NDEATH")) {
178 node_death(message->topic);
179 return;
180 }
181 if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) {
182 fermenter_death(message->topic);
183 return;
184 }
185 syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
186 }
187 }
188
189
190
191 void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain)
192 {
193 // publish the data
194 if (payload)
195 mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain);
196 else
197 mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain);
198 }
199
200
201
202 void publishNData(bool birth, int flag)
203 {
204 char *topic = NULL, *payload = NULL;
205 struct utsname ubuf;
206 bool comma = false;
207
208 payload = payload_header();
209 payload = xstrcat(payload, (char *)"{");
210
211 if (birth || flag & MQTT_NODE_CONTROL) {
212 payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}");
213 comma = true;
214 }
215
216 if (birth) {
217 if (comma)
218 payload = xstrcat(payload, (char *)",");
219 payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\"");
220 if (uname(&ubuf) == 0) {
221 payload = xstrcat(payload, (char *)",\"os\":\"");
222 payload = xstrcat(payload, ubuf.sysname);
223 payload = xstrcat(payload, (char *)"\",\"os_version\":\"");
224 payload = xstrcat(payload, ubuf.release);
225 payload = xstrcat(payload, (char *)"\"");
226 } else {
227 payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\"");
228 }
229
230 payload = xstrcat(payload, (char *)",\"FW\":\"");
231 payload = xstrcat(payload, (char *)VERSION);
232 payload = xstrcat(payload, (char *)"\"}");
233 comma = true;
234 }
235 payload = xstrcat(payload, (char *)"}}");
236
237 if (birth) {
238 topic = topic_base((char *)"NBIRTH");
239 publisher(mosq, topic, payload, true);
240 } else {
241 topic = topic_base((char *)"NDATA");
242 publisher(mosq, topic, payload, false);
243 }
244
245 free(payload);
246 payload = NULL;
247 free(topic);
248 topic = NULL;
249 }
250
251
252
253 int mqtt_connect(void)
254 {
255 char *id = NULL, *topic = NULL;
256 char err[1024];
257 int rc;
258
259 /*
260 * Initialize mosquitto communication
261 */
262 gethostname(my_hostname, 255);
263 mosquitto_lib_init();
264 id = xstrcpy((char *)"bmsd/");
265 id = xstrcat(id, my_hostname);
266 if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
267 /*
268 * Enforce maximum client id length of 23 characters
269 */
270 id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
271 }
272
273 mosq = mosquitto_new(id, TRUE, NULL);
274 if (!mosq) {
275 switch(errno) {
276 case ENOMEM:
277 syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory");
278 break;
279 case EINVAL:
280 syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id");
281 break;
282 }
283 mosquitto_lib_cleanup();
284 return 1;
285 }
286 free(id);
287 id = NULL;
288
289 /*
290 * Set our will
291 */
292 topic = topic_base((char *)"NDEATH");
293 if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) {
294 if (rc > MOSQ_ERR_SUCCESS)
295 syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc));
296 mosquitto_lib_cleanup();
297 return 2;
298 }
299 free(topic);
300 topic = NULL;
301
302 if (debug)
303 mosquitto_log_callback_set(mosq, my_log_callback);
304
305 /*
306 * Username/Password
307 */
308 if (Config.mqtt_user) {
309 if (Config.mqtt_pass) {
310 rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass);
311 } else {
312 rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL);
313 }
314 if (rc == MOSQ_ERR_INVAL) {
315 syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid");
316 } else if (rc == MOSQ_ERR_NOMEM) {
317 syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory");
318 }
319 if (rc != MOSQ_ERR_SUCCESS) {
320 mosquitto_lib_cleanup();
321 return 3;
322 }
323 }
324
325 mosquitto_max_inflight_messages_set(mosq, max_inflight);
326 mosquitto_connect_callback_set(mosq, my_connect_callback);
327 mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
328 mosquitto_publish_callback_set(mosq, my_publish_callback);
329 mosquitto_message_callback_set(mosq, my_message_callback);
330 mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
331
332 if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) {
333 if (rc == MOSQ_ERR_ERRNO) {
334 strerror_r(errno, err, 1024);
335 syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
336 } else {
337 syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc);
338 }
339 mosquitto_lib_cleanup();
340 syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker.");
341 return 4;
342 } else {
343 mqtt_use = TRUE;
344 syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port);
345
346 /*
347 * Initialise is complete, report our presence state
348 */
349 mosquitto_loop_start(mosq);
350 publishNData(true, 0);
351 }
352
353 return 0;
354 }
355
356
357
358 void mqtt_disconnect(void)
359 {
360 int rc;
361 char *topic = NULL;
362
363 if (mqtt_use) {
364 /*
365 * Final publish 0 to clients/<hostname>/bmsd/state
366 * After that, remove the retained topic.
367 */
368 syslog(LOG_NOTICE, "MQTT disconnecting");
369 topic = topic_base((char *)"NBIRTH");
370 publisher(mosq, topic, NULL, true);
371 free(topic);
372 topic = topic_base((char *)"NDEATH");
373 publisher(mosq, topic, NULL, true);
374 free(topic);
375 topic = NULL;
376 mqtt_last_mid = mqtt_mid_sent;
377 mqtt_status = STATUS_WAITING;
378 mqtt_my_shutdown = TRUE;
379
380 do {
381 if (mqtt_status == STATUS_WAITING) {
382 if (debug)
383 fprintf(stdout, (char *)"Waiting\n");
384 if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) {
385 mosquitto_disconnect(mosq);
386 mqtt_disconnect_sent = TRUE;
387 }
388 usleep(100000);
389 }
390 rc = MOSQ_ERR_SUCCESS;
391 } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);
392
393 mosquitto_loop_stop(mosq, FALSE);
394 mosquitto_destroy(mosq);
395 mosquitto_lib_cleanup();
396 mqtt_use = FALSE;
397 mqtt_status = STATUS_CONNECTING;
398 mqtt_mid_sent = 0;
399 mqtt_last_mid = -1;
400 mqtt_last_mid_sent = -1;
401 mqtt_connected = TRUE;
402 mqtt_disconnect_sent = FALSE;
403 mqtt_connect_lost = FALSE;
404 mqtt_my_shutdown = FALSE;
405 syslog(LOG_NOTICE, "MQTT: disconnected");
406 }
407 }
408
409

mercurial