thermferm/mqtt.c

changeset 506
cdcd07bbee30
parent 504
862de87f9f89
child 508
9e67c5f9c994
equal deleted inserted replaced
504:862de87f9f89 506:cdcd07bbee30
1 /***************************************************************************** 1 /*****************************************************************************
2 * Copyright (C) 2016 2 * Copyright (C) 2016-2017
3 * 3 *
4 * Michiel Broek <mbroek at mbse dot eu> 4 * Michiel Broek <mbroek at mbse dot eu>
5 * 5 *
6 * This file is part of the mbsePi-apps 6 * This file is part of the mbsePi-apps
7 * 7 *
24 #include "xutil.h" 24 #include "xutil.h"
25 #include "mqtt.h" 25 #include "mqtt.h"
26 26
27 extern sys_config Config; 27 extern sys_config Config;
28 extern int debug; 28 extern int debug;
29 extern const char UNITMODE[5][8];
30 extern const char PROFSTATE[5][6];
31 extern const char TEMPSTATE[3][8];
32
33 int Sequence = 0;
34
29 35
30 #ifdef HAVE_MOSQUITTO_H 36 #ifdef HAVE_MOSQUITTO_H
31 37
32 38
33 /* Global variables for use in callbacks. */ 39 /* Global variables for use in callbacks. */
46 struct mosquitto *mosq = NULL; 52 struct mosquitto *mosq = NULL;
47 char *state = NULL; 53 char *state = NULL;
48 char my_hostname[256]; 54 char my_hostname[256];
49 55
50 56
57
58 char *payload_header(void)
59 {
60 char *tmp, buf[128];
61
62 tmp = xstrcpy((char *)"{\"timestamp\":");
63 sprintf(buf, "%ld", time(NULL));
64 tmp = xstrcat(tmp, buf);
65 tmp = xstrcat(tmp, (char *)",\"seq\":");
66 sprintf(buf, "%d", Sequence++);
67 tmp = xstrcat(tmp, buf);
68 tmp = xstrcat(tmp, (char *)",\"metric\":");
69 return tmp;
70 }
71
72
73
74 char *topic_base(char *msgtype)
75 {
76 char *tmp;
77
78 tmp = xstrcpy((char *)"mbv1.0/fermenters/");
79 tmp = xstrcat(tmp, msgtype);
80 tmp = xstrcat(tmp, (char *)"/");
81 tmp = xstrcat(tmp, my_hostname);
82 return tmp;
83 }
84
85
86
51 void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) 87 void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
52 { 88 {
89 char *topic = NULL;
90
53 if (mqtt_connect_lost) { 91 if (mqtt_connect_lost) {
54 mqtt_connect_lost = FALSE; 92 mqtt_connect_lost = FALSE;
55 syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); 93 syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
56 } 94 }
57 95
58 if (!result) { 96 if (!result) {
59 mqtt_status = STATUS_CONNACK_RECVD; 97 topic = topic_base((char *)"NCMD");
98 topic = xstrcat(topic, (char *)"/#");
99 mosquitto_subscribe(mosq, NULL, topic, 0);
100 free(topic);
101 topic = topic_base((char *)"DCMD");
102 topic = xstrcat(topic, (char *)"/#");
103 mosquitto_subscribe(mosq, NULL, topic, 0);
104 free(topic);
105 topic = NULL;
106 mqtt_status = STATUS_CONNACK_RECVD;
60 } else { 107 } else {
61 syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); 108 syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
62 } 109 }
63 } 110 }
64 111
65 112
66 113
85 mqtt_last_mid_sent = mid; 132 mqtt_last_mid_sent = mid;
86 } 133 }
87 134
88 135
89 136
137 void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
138 {
139 int i;
140
141 syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
142 for (i = 1; i < qos_count; i++) {
143 syslog(LOG_NOTICE, " %d", granted_qos[i]);
144 }
145 }
146
147
148
90 void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) 149 void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
91 { 150 {
92 syslog(LOG_NOTICE, "MQTT: %s", str); 151 syslog(LOG_NOTICE, "MQTT: %s", str);
93 if (debug) 152 if (debug)
94 fprintf(stdout, "MQTT: %s\n", str); 153 fprintf(stdout, "MQTT: %s\n", str);
95 } 154 }
96 155
97 156
98 157
158 void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
159 {
160 if (message->payloadlen) {
161 syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
162 // TODO: process subscribed topics here.
163
164 } else {
165 syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
166 }
167 }
168
169
170
171 void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) {
172 // publish the data
173 if (payload)
174 mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain);
175 else
176 mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain);
177 }
178
179
180
181 char *unit_data(units_list *unit, bool birth)
182 {
183 char *payload = NULL;
184 char buf[128];
185 bool comma = false;
186 profiles_list *profile;
187 prof_step *pstep;
188
189 payload = xstrcat(payload, (char *)"{");
190 if (birth || unit->mqtt_flag & MQTT_FLAG_MODE) {
191 // Also send these on mode change
192 payload = xstrcat(payload, (char *)"\"uuid\":\"");
193 payload = xstrcat(payload, unit->uuid);
194 payload = xstrcat(payload, (char *)"\",\"alias\":\"");
195 payload = xstrcat(payload, unit->alias);
196 payload = xstrcat(payload, (char *)"\",\"name\":\"");
197 payload = xstrcat(payload, unit->name);
198 payload = xstrcat(payload, (char *)"\"");
199 comma = true;
200 }
201 if (birth || unit->mqtt_flag & MQTT_FLAG_AIR) {
202 if (comma)
203 payload = xstrcat(payload, (char *)",");
204 if (unit->air_address) {
205 payload = xstrcat(payload, (char *)"\"air\":{\"address\":\"");
206 payload = xstrcat(payload, unit->air_address);
207 payload = xstrcat(payload, (char *)"\",\"state\":\"");
208 payload = xstrcat(payload, (char *)TEMPSTATE[unit->air_state]);
209 payload = xstrcat(payload, (char *)"\",\"temperature\":");
210 sprintf(buf, "%.3f", unit->air_temperature / 1000.0);
211 payload = xstrcat(payload, buf);
212 payload = xstrcat(payload, (char *)"}");
213 } else {
214 payload = xstrcat(payload, (char *)"\"air\":null");
215 }
216 comma = true;
217 }
218 if (birth || unit->mqtt_flag & MQTT_FLAG_BEER) {
219 if (comma)
220 payload = xstrcat(payload, (char *)",");
221 if (unit->beer_address) {
222 payload = xstrcat(payload, (char *)"\"beer\":{\"address\":\"");
223 payload = xstrcat(payload, unit->beer_address);
224 payload = xstrcat(payload, (char *)"\",\"state\":\"");
225 payload = xstrcat(payload, (char *)TEMPSTATE[unit->beer_state]);
226 payload = xstrcat(payload, (char *)"\",\"temperature\":");
227 sprintf(buf, "%.3f", unit->beer_temperature / 1000.0);
228 payload = xstrcat(payload, buf);
229 payload = xstrcat(payload, (char *)"}");
230 } else {
231 payload = xstrcat(payload, (char *)"\"beer\":null");
232 }
233 comma = true;
234 }
235 if (birth || unit->mqtt_flag & MQTT_FLAG_HEATER) {
236 if (comma)
237 payload = xstrcat(payload, (char *)",");
238 if (unit->heater_address) {
239 payload = xstrcat(payload, (char *)"\"heater\":{\"address\":\"");
240 payload = xstrcat(payload, unit->heater_address);
241 payload = xstrcat(payload, (char *)"\",\"state\":");
242 sprintf(buf, "%d", unit->heater_state);
243 payload = xstrcat(payload, buf);
244 payload = xstrcat(payload, (char *)"}");
245 } else {
246 payload = xstrcat(payload, (char *)"\"heater\":null");
247 }
248 comma = true;
249 }
250 if (birth || unit->mqtt_flag & MQTT_FLAG_COOLER) {
251 if (comma)
252 payload = xstrcat(payload, (char *)",");
253 if (unit->cooler_address) {
254 payload = xstrcat(payload, (char *)"\"cooler\":{\"address\":\"");
255 payload = xstrcat(payload, unit->cooler_address);
256 payload = xstrcat(payload, (char *)"\",\"state\":");
257 sprintf(buf, "%d", unit->cooler_state);
258 payload = xstrcat(payload, buf);
259 payload = xstrcat(payload, (char *)"}");
260 } else {
261 payload = xstrcat(payload, (char *)"\"cooler\":null");
262 }
263 comma = true;
264 }
265 if (birth || unit->mqtt_flag & MQTT_FLAG_FAN) {
266 if (comma)
267 payload = xstrcat(payload, (char *)",");
268 if (unit->fan_address) {
269 payload = xstrcat(payload, (char *)"\"fan\":{\"address\":\"");
270 payload = xstrcat(payload, unit->fan_address);
271 payload = xstrcat(payload, (char *)"\",\"state\":");
272 sprintf(buf, "%d", unit->fan_state);
273 payload = xstrcat(payload, buf);
274 payload = xstrcat(payload, (char *)"}");
275 } else {
276 payload = xstrcat(payload, (char *)"\"fan\":null");
277 }
278 comma = true;
279 }
280 if (birth || unit->mqtt_flag & MQTT_FLAG_DOOR) {
281 if (comma)
282 payload = xstrcat(payload, (char *)",");
283 if (unit->door_address) {
284 payload = xstrcat(payload, (char *)"\"door\":{\"address\":\"");
285 payload = xstrcat(payload, unit->door_address);
286 payload = xstrcat(payload, (char *)"\",\"state\":");
287 sprintf(buf, "%d", unit->door_state);
288 payload = xstrcat(payload, buf);
289 payload = xstrcat(payload, (char *)"}");
290 } else {
291 payload = xstrcat(payload, (char *)"\"door\":null");
292 }
293 comma = true;
294 }
295 if (birth || unit->mqtt_flag & MQTT_FLAG_LIGHT) {
296 if (comma)
297 payload = xstrcat(payload, (char *)",");
298 if (unit->light_address) {
299 payload = xstrcat(payload, (char *)"\"light\":{\"address\":\"");
300 payload = xstrcat(payload, unit->light_address);
301 payload = xstrcat(payload, (char *)"\",\"state\":");
302 sprintf(buf, "%d", unit->light_state);
303 payload = xstrcat(payload, buf);
304 payload = xstrcat(payload, (char *)"}");
305 } else {
306 payload = xstrcat(payload, (char *)"\"light\":null");
307 }
308 comma = true;
309 }
310 if (birth || unit->mqtt_flag & MQTT_FLAG_PSU) {
311 if (comma)
312 payload = xstrcat(payload, (char *)",");
313 if (unit->psu_address) {
314 payload = xstrcat(payload, (char *)"\"psu\":{\"address\":\"");
315 payload = xstrcat(payload, unit->psu_address);
316 payload = xstrcat(payload, (char *)"\",\"state\":");
317 sprintf(buf, "%d", unit->psu_state);
318 payload = xstrcat(payload, buf);
319 payload = xstrcat(payload, (char *)"}");
320 } else {
321 payload = xstrcat(payload, (char *)"\"psu\":null");
322 }
323 comma = true;
324 }
325 if (birth || unit->mqtt_flag & MQTT_FLAG_MODE) {
326 if (comma)
327 payload = xstrcat(payload, (char *)",");
328 payload = xstrcat(payload, (char *)"\"mode\":\"");
329 payload = xstrcat(payload, (char *)UNITMODE[unit->mode]);
330 payload = xstrcat(payload, (char *)"\"");
331 comma = true;
332 }
333 if (birth || unit->mqtt_flag & MQTT_FLAG_SP) {
334 if (unit->mode != UNITMODE_OFF) {
335 if (comma)
336 payload = xstrcat(payload, (char *)",");
337 payload = xstrcat(payload, (char *)"\"setpoint\":{\"low\":");
338 sprintf(buf, "%.1f", unit->PID_heat->SetP);
339 payload = xstrcat(payload, buf);
340 payload = xstrcat(payload, (char *)",\"high\":");
341 sprintf(buf, "%.1f", unit->PID_cool->SetP);
342 payload = xstrcat(payload, buf);
343 payload = xstrcat(payload, (char *)"}");
344 comma = true;
345 }
346 }
347 if (birth || unit->mqtt_flag & MQTT_FLAG_PROFILE || unit->mqtt_flag & MQTT_FLAG_PERCENT) {
348 if (unit->mode == UNITMODE_PROFILE && unit->profile) {
349 for (profile = Config.profiles; profile; profile = profile->next) {
350 if (strcmp(unit->profile, profile->uuid) == 0) {
351 if (comma)
352 payload = xstrcat(payload, (char *)",");
353 payload = xstrcat(payload, (char *)"\"profile\":{\"uuid\":\"");
354 payload = xstrcat(payload, unit->profile);
355 payload = xstrcat(payload, (char *)",\"name\":\"");
356 payload = xstrcat(payload, profile->name);
357 payload = xstrcat(payload, (char *)"\",\"inittemp\":{\"low\":");
358 sprintf(buf, "%.1f", profile->inittemp_lo);
359 payload = xstrcat(payload, buf);
360 payload = xstrcat(payload, (char *)",\"high\":");
361 sprintf(buf, "%.1f", profile->inittemp_hi);
362 payload = xstrcat(payload, buf);
363 payload = xstrcat(payload, (char *)"},\"fridgemode\":");
364 sprintf(buf, "%d", profile->fridge_mode);
365 payload = xstrcat(payload, buf);
366 comma = false;
367 if (profile->steps) {
368 payload = xstrcat(payload, (char *)",\"steps\":[");
369 for (pstep = profile->steps; pstep; pstep = pstep->next) {
370 if (comma)
371 payload = xstrcat(payload, (char *)",");
372 payload = xstrcat(payload, (char *)"{\"resttime\":");
373 sprintf(buf, "%d", pstep->resttime);
374 payload = xstrcat(payload, buf);
375 payload = xstrcat(payload, (char *)",\"steptime\":");
376 sprintf(buf, "%d", pstep->steptime);
377 payload = xstrcat(payload, buf);
378 payload = xstrcat(payload, (char *)",\"target\":{\"low\":");
379 sprintf(buf, "%.1f", pstep->target_lo);
380 payload = xstrcat(payload, buf);
381 payload = xstrcat(payload, (char *)",\"high\":");
382 sprintf(buf, "%.1f", pstep->target_hi);
383 payload = xstrcat(payload, buf);
384 payload = xstrcat(payload, (char *)"},\"fridgemode\":");
385 sprintf(buf, "%d", pstep->fridge_mode);
386 payload = xstrcat(payload, buf);
387 payload = xstrcat(payload, (char *)"}");
388 comma = true;
389 }
390 payload = xstrcat(payload, (char *)"]");
391 } else {
392 payload = xstrcat(payload, (char *)",\"steps\":null");
393 }
394 payload = xstrcat(payload, (char *)"}");
395 break;
396 }
397 }
398 } else {
399 if (comma)
400 payload = xstrcat(payload, (char *)",");
401 payload = xstrcat(payload, (char *)"\"profile\":null");
402 }
403 }
404 payload = xstrcat(payload, (char *)"}");
405
406 return payload;
407 }
408
409
410
411 void publishDBirth(void)
412 {
413 char *payload = NULL;
414 units_list *unit;
415 int comma = FALSE;
416
417 payload = payload_header();
418 payload = xstrcat(payload, (char *)"{\"units\":[");
419 for (unit = Config.units; unit; unit = unit->next) {
420 if (comma)
421 payload = xstrcat(payload, (char *)",");
422 payload = xstrcat(payload, unit_data(unit, true));
423 comma = TRUE;
424 }
425 payload = xstrcat(payload, (char *)"]}}");
426 publisher(mosq, topic_base((char *)"DBIRTH"), payload, true);
427 free(payload);
428 payload = NULL;
429 }
430
99 #endif 431 #endif
100 432
101 433
434 void publishDData(units_list *unit)
435 {
436 #ifdef HAVE_MOSQUITTO_H
437
438 char *payload = NULL, *topic = NULL;
439
440 if (mqtt_use) {
441 payload = payload_header();
442 payload = xstrcat(payload, unit_data(unit, false));
443 payload = xstrcat(payload, (char *)"}");
444 topic = xstrcat(topic_base((char *)"DDATA"), (char *)"/");
445 topic = xstrcat(topic, unit->alias);
446 publisher(mosq, topic, payload, false);
447 free(payload);
448 payload = NULL;
449 free(topic);
450 topic = NULL;
451 }
452 #endif
453 }
454
455
456
457 void publishNData(bool birth, int flag)
458 {
459 #ifdef HAVE_MOSQUITTO_H
460 char *payload = NULL, buf[64];
461 struct utsname ubuf;
462 bool comma = false;
463
464 payload = payload_header();
465 payload = xstrcat(payload, (char *)"{");
466
467 if (birth || flag & MQTT_NODE_CONTROL) {
468 payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}");
469 comma = true;
470 }
471
472 if (birth) {
473 if (comma)
474 payload = xstrcat(payload, (char *)",");
475 payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\"");
476 if (uname(&ubuf) == 0) {
477 payload = xstrcat(payload, (char *)",\"os\":\"");
478 payload = xstrcat(payload, ubuf.sysname);
479 payload = xstrcat(payload, (char *)"\",\"os_version\":\"");
480 payload = xstrcat(payload, ubuf.release);
481 payload = xstrcat(payload, (char *)"\"");
482 } else {
483 payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\"");
484 }
485
486 payload = xstrcat(payload, (char *)",\"FW\":\"");
487 payload = xstrcat(payload, (char *)VERSION);
488 payload = xstrcat(payload, (char *)"\"}");
489 comma = true;
490 }
491
492 if (birth || flag & MQTT_NODE_HT) {
493 if (Config.temp_address || Config.hum_address) {
494 if (comma)
495 payload = xstrcat(payload, (char *)",");
496 payload = xstrcat(payload, (char *)"\"HT\":{");
497 if (Config.temp_address) {
498 payload = xstrcat(payload, (char *)"\"temperature\":");
499 sprintf(buf, "%.1f", Config.temp_value / 1000.0);
500 payload = xstrcat(payload, buf);
501 }
502 if (Config.temp_address && Config.hum_address)
503 payload = xstrcat(payload, (char *)",");
504 if (Config.hum_address) {
505 payload = xstrcat(payload, (char *)"\"humidity\":");
506 sprintf(buf, "%.1f", Config.hum_value / 1000.0);
507 payload = xstrcat(payload, buf);
508 }
509 payload = xstrcat(payload, (char *)"}");
510 }
511 }
512 payload = xstrcat(payload, (char *)"}}");
513
514 if (birth)
515 publisher(mosq, topic_base((char *)"NBIRTH"), payload, true);
516 else
517 publisher(mosq, topic_base((char *)"NDATA"), payload, false);
518
519 free(payload);
520 payload = NULL;
521 #endif
522 }
523
524
525
526 void publishBirth(void)
527 {
528 publishNData(true, 0);
529 publishDBirth();
530 }
531
532
533
534
102 void mqtt_connect(void) 535 void mqtt_connect(void)
103 { 536 {
104 #ifdef HAVE_MOSQUITTO_H 537 #ifdef HAVE_MOSQUITTO_H
105 char *id = NULL; 538 char *id = NULL;
106 char err[1024]; 539 char err[1024];
107 int rc; 540 int rc;
108 541
109 /* 542 /*
110 * Initialize mosquitto communication 543 * Initialize mosquitto communication
111 */ 544 */
112 gethostname(my_hostname, 255); 545 gethostname(my_hostname, 255);
132 } 565 }
133 mosquitto_lib_cleanup(); 566 mosquitto_lib_cleanup();
134 return; 567 return;
135 } 568 }
136 569
137 if (debug) {
138 mosquitto_log_callback_set(mosq, my_log_callback);
139 }
140
141 /* 570 /*
142 * Set our will 571 * Set our will
143 */ 572 */
144 state = xstrcpy((char *)"clients/"); 573 if ((rc = mosquitto_will_set(mosq, topic_base((char *)"NDEATH"), 0, NULL, mqtt_qos, false))) {
145 state = xstrcat(state, my_hostname); 574 if (rc > MOSQ_ERR_SUCCESS)
146 state = xstrcat(state, (char *)"/thermferm/state"); 575 syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc));
147 if ((rc = mosquitto_will_set(mosq, state, 1, (char *)"0", mqtt_qos, TRUE))) {
148 if (rc == MOSQ_ERR_INVAL) {
149 syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: input parameters invalid");
150 } else if (rc == MOSQ_ERR_NOMEM) {
151 syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: Out of Memory");
152 } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) {
153 syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: invalid payload size");
154 }
155 mosquitto_lib_cleanup(); 576 mosquitto_lib_cleanup();
156 return; 577 return;
157 } 578 }
158 579
580 mosquitto_log_callback_set(mosq, my_log_callback);
159 mosquitto_max_inflight_messages_set(mosq, max_inflight); 581 mosquitto_max_inflight_messages_set(mosq, max_inflight);
160 mosquitto_connect_callback_set(mosq, my_connect_callback); 582 mosquitto_connect_callback_set(mosq, my_connect_callback);
161 mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); 583 mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
162 mosquitto_publish_callback_set(mosq, my_publish_callback); 584 mosquitto_publish_callback_set(mosq, my_publish_callback);
585 mosquitto_message_callback_set(mosq, my_message_callback);
586 mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
163 587
164 if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { 588 if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) {
165 if (rc == MOSQ_ERR_ERRNO) { 589 if (rc == MOSQ_ERR_ERRNO) {
166 strerror_r(errno, err, 1024); 590 strerror_r(errno, err, 1024);
167 syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); 591 syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
176 600
177 /* 601 /*
178 * Initialise is complete, report our presence state 602 * Initialise is complete, report our presence state
179 */ 603 */
180 mosquitto_loop_start(mosq); 604 mosquitto_loop_start(mosq);
181 mosquitto_publish(mosq, &mqtt_mid_sent, state, 1, (char *)"1", mqtt_qos, 1); 605 publishBirth();
182 } 606 }
183 #endif 607 #endif
184 } 608 }
185 609
186 610
187 611
188 void mqtt_disconnect(void) 612 void mqtt_disconnect(void)
189 { 613 {
190 #ifdef HAVE_MOSQUITTO_H 614 #ifdef HAVE_MOSQUITTO_H
191 int rc; 615 int rc;
192 char buf[128];
193 616
194 if (mqtt_use) { 617 if (mqtt_use) {
195 /* 618 /*
196 * Final publish 0 to clients/<hostname>/thermferm/state 619 * Final publish 0 to clients/<hostname>/thermferm/state
197 * After that, remove the retained topic. 620 * After that, remove the retained topic.
198 */ 621 */
199 syslog(LOG_NOTICE, "MQTT disconnecting"); 622 syslog(LOG_NOTICE, "MQTT disconnecting");
200 sprintf(buf, "0"); 623 publisher(mosq, topic_base((char *)"DBIRTH"), NULL, true);
201 mosquitto_publish(mosq, &mqtt_mid_sent, state, strlen(buf), buf, mqtt_qos, true); 624 publisher(mosq, topic_base((char *)"NBIRTH"), NULL, true);
202 mosquitto_publish(mosq, &mqtt_mid_sent, state, 0, NULL, mqtt_qos, true);
203 mqtt_last_mid = mqtt_mid_sent; 625 mqtt_last_mid = mqtt_mid_sent;
204 mqtt_status = STATUS_WAITING; 626 mqtt_status = STATUS_WAITING;
205 mqtt_my_shutdown = TRUE; 627 mqtt_my_shutdown = TRUE;
206 628
207 do { 629 do {
218 } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); 640 } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);
219 641
220 mosquitto_loop_stop(mosq, FALSE); 642 mosquitto_loop_stop(mosq, FALSE);
221 mosquitto_destroy(mosq); 643 mosquitto_destroy(mosq);
222 mosquitto_lib_cleanup(); 644 mosquitto_lib_cleanup();
645 mqtt_use = FALSE;
223 syslog(LOG_NOTICE, "MQTT disconnected"); 646 syslog(LOG_NOTICE, "MQTT disconnected");
224 } 647 }
225 #endif 648 #endif
226 } 649 }
227 650
228 651
229 652
230 void mqtt_publish_int(char *uuid, char *tail, int value)
231 {
232 #ifdef HAVE_MOSQUITTO_H
233 char topic[1024], buf[128];
234
235 if (mqtt_use) {
236 snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
237 snprintf(buf, 127, "%d", value);
238 mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
239 }
240 #endif
241 }
242
243
244
245 void mqtt_publish_float(char *uuid, char *tail, float value, int decimals)
246 {
247 #ifdef HAVE_MOSQUITTO_H
248 char topic[1024], buf[128];
249
250 if (mqtt_use) {
251 snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
252 snprintf(buf, 127, "%.*f", decimals, value);
253 mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
254 }
255 #endif
256 }
257
258
259
260 void mqtt_publish_str(char *uuid, char *tail, char *value)
261 {
262 #ifdef HAVE_MOSQUITTO_H
263 char topic[1024], buf[128];
264
265 if (mqtt_use) {
266 snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
267 snprintf(buf, 127, "%s", value);
268 mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
269 }
270 #endif
271 }
272
273
274
275 void mqtt_publish_clear(char *uuid, char *tail)
276 {
277 #ifdef HAVE_MOSQUITTO_H
278 char topic[1024];
279
280 if (mqtt_use) {
281 snprintf(topic, 1023, "bmsd/%s/%s/%s", my_hostname, uuid, tail);
282 mosquitto_publish(mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, 1);
283 }
284 #endif
285 }
286

mercurial