Wed, 18 Oct 2023 10:06:11 +0200
Version 0.3.45. Removed all writing to ascii logfiles in the webserver environment, only log to MySQL.
0 | 1 | /***************************************************************************** |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
2 | * Copyright (C) 2017-2021 |
0 | 3 | * |
4 | * Michiel Broek <mbroek at mbse dot eu> | |
5 | * | |
6 | * This file is part of the bms (Brewery Management System) | |
7 | * | |
8 | * This is free software; you can redistribute it and/or modify it | |
9 | * under the terms of the GNU General Public License as published by the | |
10 | * Free Software Foundation; either version 2, or (at your option) any | |
11 | * later version. | |
12 | * | |
13 | * bms is distributed in the hope that it will be useful, but | |
14 | * WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | * General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with ThermFerm; see the file COPYING. If not, write to the Free | |
20 | * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. | |
21 | *****************************************************************************/ | |
22 | ||
23 | #include "bms.h" | |
24 | #include "xutil.h" | |
25 | #include "mqtt.h" | |
26 | #include "nodes.h" | |
27 | #include "fermenters.h" | |
502
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
28 | #include "co2meters.h" |
567
6bf0afc33e70
Initial code for iSpindel support in the daemon
Michiel Broek <mbroek@mbse.eu>
parents:
502
diff
changeset
|
29 | #include "ispindels.h" |
0 | 30 | |
31 | ||
32 | extern sys_config Config; | |
33 | extern int debug; | |
34 | ||
35 | ||
36 | ||
37 | /* Global variables for use in callbacks. */ | |
38 | int mqtt_qos = 0; | |
39 | int mqtt_status = STATUS_CONNECTING; | |
40 | int mqtt_mid_sent = 0; | |
41 | int mqtt_last_mid = -1; | |
42 | int mqtt_last_mid_sent = -1; | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
43 | int mqtt_connected = true; |
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
44 | int mqtt_disconnect_sent = false; |
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
45 | int mqtt_connect_lost = false; |
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
46 | int mqtt_my_shutdown = false; |
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
47 | int mqtt_use = false; |
0 | 48 | int keepalive = 60; |
49 | unsigned int max_inflight = 20; | |
50 | struct mosquitto *mosq = NULL; | |
51 | char *state = NULL; | |
52 | char my_hostname[256]; | |
53 | int Sequence = 0; | |
54 | ||
55 | ||
56 | char *payload_header(void) | |
57 | { | |
58 | static char *tmp; | |
59 | char buf[128]; | |
60 | ||
61 | tmp = xstrcpy((char *)"{\"timestamp\":"); | |
62 | sprintf(buf, "%ld", time(NULL)); | |
63 | tmp = xstrcat(tmp, buf); | |
64 | tmp = xstrcat(tmp, (char *)",\"seq\":"); | |
65 | sprintf(buf, "%d", Sequence++); | |
66 | tmp = xstrcat(tmp, buf); | |
67 | tmp = xstrcat(tmp, (char *)",\"metric\":"); | |
68 | return tmp; | |
69 | } | |
70 | ||
71 | ||
72 | ||
73 | char *topic_base(char *msgtype) | |
74 | { | |
75 | static char *tmp; | |
76 | ||
77 | tmp = xstrcpy((char *)"mbv1.0/brewery/"); | |
78 | tmp = xstrcat(tmp, msgtype); | |
79 | tmp = xstrcat(tmp, (char *)"/"); | |
80 | tmp = xstrcat(tmp, my_hostname); | |
81 | return tmp; | |
82 | } | |
83 | ||
84 | ||
85 | ||
86 | void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result) | |
87 | { | |
88 | char *topic = NULL; | |
89 | ||
90 | if (mqtt_connect_lost) { | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
91 | mqtt_connect_lost = false; |
0 | 92 | syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result)); |
93 | } | |
94 | ||
95 | if (!result) { | |
96 | topic = topic_base((char *)"NCMD"); // TODO: do we need this?? | |
97 | topic = xstrcat(topic, (char *)"/#"); | |
98 | mosquitto_subscribe(mosq, NULL, topic, 0); | |
99 | free(topic); | |
501
9c41e865144a
Accept NBIRTH messages from co2meter modules.
Michiel Broek <mbroek@mbse.eu>
parents:
75
diff
changeset
|
100 | topic = xstrcpy((char *)"mbv1.0/fermenters/#"); // Subscribe to fermenter messages. |
0 | 101 | mosquitto_subscribe(mosq, NULL, topic, 0); |
102 | free(topic); | |
502
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
103 | topic = xstrcpy((char *)"mbv1.0/co2meters/#"); // Subscribe to co2meter messages. |
75
1a3c6480e057
Added support for brewcontrol nodes
Michiel Broek <mbroek@mbse.eu>
parents:
0
diff
changeset
|
104 | mosquitto_subscribe(mosq, NULL, topic, 0); |
1a3c6480e057
Added support for brewcontrol nodes
Michiel Broek <mbroek@mbse.eu>
parents:
0
diff
changeset
|
105 | free(topic); |
578
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
106 | topic = xstrcpy((char *)"mbv1.0/ispindels/#"); // Subscribe to ispindel messages. |
572
7a03181d29a3
Version 0.3.27 More code for iSpindels. All online/offline data in the database is now handles as integers. Nodes timeout use the configured interval time from the nodes.
Michiel Broek <mbroek@mbse.eu>
parents:
567
diff
changeset
|
107 | mosquitto_subscribe(mosq, NULL, topic, 0); |
7a03181d29a3
Version 0.3.27 More code for iSpindels. All online/offline data in the database is now handles as integers. Nodes timeout use the configured interval time from the nodes.
Michiel Broek <mbroek@mbse.eu>
parents:
567
diff
changeset
|
108 | free(topic); |
0 | 109 | topic = NULL; |
110 | mqtt_status = STATUS_CONNACK_RECVD; | |
111 | } else { | |
112 | syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result)); | |
113 | } | |
114 | } | |
115 | ||
116 | ||
117 | ||
118 | void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc) | |
119 | { | |
120 | if (mqtt_my_shutdown) { | |
121 | syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mqtt_host); | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
122 | mqtt_connected = false; |
0 | 123 | } else { |
124 | /* | |
125 | * The remote server was brought down. We must keep running | |
126 | */ | |
127 | syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mqtt_host); | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
128 | mqtt_connect_lost = true; |
0 | 129 | } |
130 | } | |
131 | ||
132 | ||
133 | ||
134 | void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid) | |
135 | { | |
136 | mqtt_last_mid_sent = mid; | |
137 | } | |
138 | ||
139 | ||
140 | ||
141 | void my_subscribe_callback(struct mosquitto *my_mosq, void *userdata, int mid, int qos_count, const int *granted_qos) | |
142 | { | |
143 | int i; | |
144 | ||
145 | syslog(LOG_NOTICE, "Subscribed (mid: %d): %d", mid, granted_qos[0]); | |
146 | for (i = 1; i < qos_count; i++) { | |
147 | syslog(LOG_NOTICE, " %d", granted_qos[i]); | |
148 | } | |
149 | } | |
150 | ||
151 | ||
152 | ||
153 | void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str) | |
154 | { | |
155 | syslog(LOG_NOTICE, "MQTT: %s", str); | |
156 | if (debug) | |
157 | fprintf(stdout, "MQTT: %s\n", str); | |
158 | } | |
159 | ||
160 | ||
161 | ||
162 | void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) | |
163 | { | |
164 | if (message->payloadlen) { | |
165 | // TODO: process subscribed topics here. | |
166 | if (strstr(message->topic, (char *)"NBIRTH") || strstr(message->topic, (char *)"NDATA")) { | |
167 | node_birth_data(message->topic, (char *)message->payload); | |
168 | return; | |
169 | } | |
170 | if (strstr(message->topic, (char *)"fermenters") && (strstr(message->topic, (char *)"DBIRTH") || strstr(message->topic, (char *)"DDATA"))) { | |
171 | fermenter_birth_data(message->topic, (char *)message->payload); | |
172 | return; | |
173 | } | |
174 | if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DLOG")) { | |
175 | fermenter_log(message->topic, (char *)message->payload); | |
176 | return; | |
177 | } | |
590
a43b8b85d8b3
Supress DCMD error messages in the log.
Michiel Broek <mbroek@mbse.eu>
parents:
578
diff
changeset
|
178 | if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DCMD")) { |
a43b8b85d8b3
Supress DCMD error messages in the log.
Michiel Broek <mbroek@mbse.eu>
parents:
578
diff
changeset
|
179 | return; // just ignore our own commands. |
a43b8b85d8b3
Supress DCMD error messages in the log.
Michiel Broek <mbroek@mbse.eu>
parents:
578
diff
changeset
|
180 | } |
502
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
181 | if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DBIRTH")) { |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
182 | co2meter_birth_data(message->topic, (char *)message->payload); |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
183 | return; |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
184 | } |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
185 | if (strstr(message->topic, (char *)"co2meters") && strstr(message->topic, (char *)"DLOG")) { |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
186 | co2meter_log(message->topic, (char *)message->payload); |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
187 | return; |
a8a6901b5a99
Added CO2 meter units to the daemon.
Michiel Broek <mbroek@mbse.eu>
parents:
501
diff
changeset
|
188 | } |
578
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
189 | if (strstr(message->topic, (char *)"ispindels") && strstr(message->topic, (char *)"DBIRTH")) { |
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
190 | ispindel_birth_data(message->topic, (char *)message->payload); |
e75ce5bbda73
Changed the interface from the iSpindels to be the same as other devices. A webpage converts each call to two standard MQTT messages. The nodes MQTT message extended with an interval parameter. iSpindels now have a generated uuid made up from the chipid.
Michiel Broek <mbroek@mbse.eu>
parents:
572
diff
changeset
|
191 | return; |
567
6bf0afc33e70
Initial code for iSpindel support in the daemon
Michiel Broek <mbroek@mbse.eu>
parents:
502
diff
changeset
|
192 | } |
0 | 193 | syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); |
194 | } else { | |
195 | if (strstr(message->topic, (char *)"NBIRTH")) { | |
196 | // Ignore ?? | |
197 | fprintf(stdout, "MQTT: %s NULL\n", message->topic); | |
198 | return; | |
199 | } | |
200 | if (strstr(message->topic, (char *)"NDEATH")) { | |
201 | node_death(message->topic); | |
202 | return; | |
203 | } | |
204 | if (strstr(message->topic, (char *)"fermenters") && strstr(message->topic, (char *)"DDEATH")) { | |
205 | fermenter_death(message->topic); | |
206 | return; | |
207 | } | |
208 | syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); | |
209 | } | |
210 | } | |
211 | ||
212 | ||
213 | ||
214 | void publisher(struct mosquitto *my_mosq, char *topic, char *payload, bool retain) | |
215 | { | |
216 | // publish the data | |
217 | if (payload) | |
218 | mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, retain); | |
219 | else | |
220 | mosquitto_publish(my_mosq, &mqtt_mid_sent, topic, 0, NULL, mqtt_qos, retain); | |
221 | } | |
222 | ||
223 | ||
224 | ||
679
48f8f3fce7c0
Added reconnecting-websocket.js to automatic reconnect the websocket if the connection is lost. Usefull for mobile devices that go to sleep after a while. Changed mon_fermenters to use websockets instead of polling. Fixed wrong temperature color ranges on the fermenter monior. Increased the websocket receive buffer to 2048. In cannot overflow, but larger messages are chunked and the application does not handle these split messages. Needs termferm 0.9.9 or newer.
Michiel Broek <mbroek@mbse.eu>
parents:
590
diff
changeset
|
225 | void mqtt_publish(char *topic, char *payload) |
48f8f3fce7c0
Added reconnecting-websocket.js to automatic reconnect the websocket if the connection is lost. Usefull for mobile devices that go to sleep after a while. Changed mon_fermenters to use websockets instead of polling. Fixed wrong temperature color ranges on the fermenter monior. Increased the websocket receive buffer to 2048. In cannot overflow, but larger messages are chunked and the application does not handle these split messages. Needs termferm 0.9.9 or newer.
Michiel Broek <mbroek@mbse.eu>
parents:
590
diff
changeset
|
226 | { |
48f8f3fce7c0
Added reconnecting-websocket.js to automatic reconnect the websocket if the connection is lost. Usefull for mobile devices that go to sleep after a while. Changed mon_fermenters to use websockets instead of polling. Fixed wrong temperature color ranges on the fermenter monior. Increased the websocket receive buffer to 2048. In cannot overflow, but larger messages are chunked and the application does not handle these split messages. Needs termferm 0.9.9 or newer.
Michiel Broek <mbroek@mbse.eu>
parents:
590
diff
changeset
|
227 | mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(payload), payload, mqtt_qos, false); |
48f8f3fce7c0
Added reconnecting-websocket.js to automatic reconnect the websocket if the connection is lost. Usefull for mobile devices that go to sleep after a while. Changed mon_fermenters to use websockets instead of polling. Fixed wrong temperature color ranges on the fermenter monior. Increased the websocket receive buffer to 2048. In cannot overflow, but larger messages are chunked and the application does not handle these split messages. Needs termferm 0.9.9 or newer.
Michiel Broek <mbroek@mbse.eu>
parents:
590
diff
changeset
|
228 | } |
48f8f3fce7c0
Added reconnecting-websocket.js to automatic reconnect the websocket if the connection is lost. Usefull for mobile devices that go to sleep after a while. Changed mon_fermenters to use websockets instead of polling. Fixed wrong temperature color ranges on the fermenter monior. Increased the websocket receive buffer to 2048. In cannot overflow, but larger messages are chunked and the application does not handle these split messages. Needs termferm 0.9.9 or newer.
Michiel Broek <mbroek@mbse.eu>
parents:
590
diff
changeset
|
229 | |
48f8f3fce7c0
Added reconnecting-websocket.js to automatic reconnect the websocket if the connection is lost. Usefull for mobile devices that go to sleep after a while. Changed mon_fermenters to use websockets instead of polling. Fixed wrong temperature color ranges on the fermenter monior. Increased the websocket receive buffer to 2048. In cannot overflow, but larger messages are chunked and the application does not handle these split messages. Needs termferm 0.9.9 or newer.
Michiel Broek <mbroek@mbse.eu>
parents:
590
diff
changeset
|
230 | |
48f8f3fce7c0
Added reconnecting-websocket.js to automatic reconnect the websocket if the connection is lost. Usefull for mobile devices that go to sleep after a while. Changed mon_fermenters to use websockets instead of polling. Fixed wrong temperature color ranges on the fermenter monior. Increased the websocket receive buffer to 2048. In cannot overflow, but larger messages are chunked and the application does not handle these split messages. Needs termferm 0.9.9 or newer.
Michiel Broek <mbroek@mbse.eu>
parents:
590
diff
changeset
|
231 | |
0 | 232 | void publishNData(bool birth, int flag) |
233 | { | |
234 | char *topic = NULL, *payload = NULL; | |
235 | struct utsname ubuf; | |
236 | bool comma = false; | |
237 | ||
238 | payload = payload_header(); | |
239 | payload = xstrcat(payload, (char *)"{"); | |
240 | ||
241 | if (birth || flag & MQTT_NODE_CONTROL) { | |
242 | payload = xstrcat(payload, (char *)"\"nodecontrol\":{\"reboot\":false,\"rebirth\":false,\"nextserver\":false,\"scanrate\":3000}"); | |
243 | comma = true; | |
244 | } | |
245 | ||
246 | if (birth) { | |
247 | if (comma) | |
248 | payload = xstrcat(payload, (char *)","); | |
249 | payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"Unknown\""); | |
250 | if (uname(&ubuf) == 0) { | |
251 | payload = xstrcat(payload, (char *)",\"os\":\""); | |
252 | payload = xstrcat(payload, ubuf.sysname); | |
253 | payload = xstrcat(payload, (char *)"\",\"os_version\":\""); | |
254 | payload = xstrcat(payload, ubuf.release); | |
255 | payload = xstrcat(payload, (char *)"\""); | |
256 | } else { | |
257 | payload = xstrcat(payload, (char *)",\"os\":\"Unknown\",\"os_version\":\"Unknown\""); | |
258 | } | |
259 | ||
260 | payload = xstrcat(payload, (char *)",\"FW\":\""); | |
261 | payload = xstrcat(payload, (char *)VERSION); | |
262 | payload = xstrcat(payload, (char *)"\"}"); | |
263 | comma = true; | |
264 | } | |
265 | payload = xstrcat(payload, (char *)"}}"); | |
266 | ||
267 | if (birth) { | |
268 | topic = topic_base((char *)"NBIRTH"); | |
269 | publisher(mosq, topic, payload, true); | |
270 | } else { | |
271 | topic = topic_base((char *)"NDATA"); | |
272 | publisher(mosq, topic, payload, false); | |
273 | } | |
274 | ||
275 | free(payload); | |
276 | payload = NULL; | |
277 | free(topic); | |
278 | topic = NULL; | |
279 | } | |
280 | ||
281 | ||
282 | ||
283 | int mqtt_connect(void) | |
284 | { | |
285 | char *id = NULL, *topic = NULL; | |
286 | char err[1024]; | |
287 | int rc; | |
288 | ||
289 | /* | |
290 | * Initialize mosquitto communication | |
291 | */ | |
292 | gethostname(my_hostname, 255); | |
293 | mosquitto_lib_init(); | |
294 | id = xstrcpy((char *)"bmsd/"); | |
295 | id = xstrcat(id, my_hostname); | |
296 | if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { | |
297 | /* | |
298 | * Enforce maximum client id length of 23 characters | |
299 | */ | |
300 | id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; | |
301 | } | |
302 | ||
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
303 | mosq = mosquitto_new(id, true, NULL); |
0 | 304 | if (!mosq) { |
305 | switch(errno) { | |
306 | case ENOMEM: | |
307 | syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory"); | |
308 | break; | |
309 | case EINVAL: | |
310 | syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id"); | |
311 | break; | |
312 | } | |
313 | mosquitto_lib_cleanup(); | |
314 | return 1; | |
315 | } | |
316 | free(id); | |
317 | id = NULL; | |
318 | ||
319 | /* | |
320 | * Set our will | |
321 | */ | |
322 | topic = topic_base((char *)"NDEATH"); | |
323 | if ((rc = mosquitto_will_set(mosq, topic, 0, NULL, mqtt_qos, false))) { | |
324 | if (rc > MOSQ_ERR_SUCCESS) | |
325 | syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: %s", mosquitto_strerror(rc)); | |
326 | mosquitto_lib_cleanup(); | |
327 | return 2; | |
328 | } | |
329 | free(topic); | |
330 | topic = NULL; | |
331 | ||
332 | if (debug) | |
333 | mosquitto_log_callback_set(mosq, my_log_callback); | |
334 | ||
335 | /* | |
336 | * Username/Password | |
337 | */ | |
338 | if (Config.mqtt_user) { | |
339 | if (Config.mqtt_pass) { | |
340 | rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, Config.mqtt_pass); | |
341 | } else { | |
342 | rc = mosquitto_username_pw_set(mosq, Config.mqtt_user, NULL); | |
343 | } | |
344 | if (rc == MOSQ_ERR_INVAL) { | |
345 | syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: input parameters invalid"); | |
346 | } else if (rc == MOSQ_ERR_NOMEM) { | |
347 | syslog(LOG_NOTICE, "MQTT: mosquitto_username_pw_set: Out of Memory"); | |
348 | } | |
349 | if (rc != MOSQ_ERR_SUCCESS) { | |
350 | mosquitto_lib_cleanup(); | |
351 | return 3; | |
352 | } | |
353 | } | |
354 | ||
355 | mosquitto_max_inflight_messages_set(mosq, max_inflight); | |
356 | mosquitto_connect_callback_set(mosq, my_connect_callback); | |
357 | mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); | |
358 | mosquitto_publish_callback_set(mosq, my_publish_callback); | |
359 | mosquitto_message_callback_set(mosq, my_message_callback); | |
360 | mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); | |
361 | ||
362 | if ((rc = mosquitto_connect(mosq, Config.mqtt_host, Config.mqtt_port, keepalive))) { | |
363 | if (rc == MOSQ_ERR_ERRNO) { | |
364 | strerror_r(errno, err, 1024); | |
365 | syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err); | |
366 | } else { | |
367 | syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc); | |
368 | } | |
369 | mosquitto_lib_cleanup(); | |
370 | syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker."); | |
371 | return 4; | |
372 | } else { | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
373 | mqtt_use = true; |
0 | 374 | syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mqtt_host, Config.mqtt_port); |
375 | ||
376 | /* | |
377 | * Initialise is complete, report our presence state | |
378 | */ | |
379 | mosquitto_loop_start(mosq); | |
380 | publishNData(true, 0); | |
381 | } | |
382 | ||
383 | return 0; | |
384 | } | |
385 | ||
386 | ||
387 | ||
388 | void mqtt_disconnect(void) | |
389 | { | |
390 | int rc; | |
391 | char *topic = NULL; | |
392 | ||
393 | if (mqtt_use) { | |
394 | /* | |
395 | * Final publish 0 to clients/<hostname>/bmsd/state | |
396 | * After that, remove the retained topic. | |
397 | */ | |
398 | syslog(LOG_NOTICE, "MQTT disconnecting"); | |
399 | topic = topic_base((char *)"NBIRTH"); | |
400 | publisher(mosq, topic, NULL, true); | |
401 | free(topic); | |
402 | topic = topic_base((char *)"NDEATH"); | |
403 | publisher(mosq, topic, NULL, true); | |
404 | free(topic); | |
405 | topic = NULL; | |
406 | mqtt_last_mid = mqtt_mid_sent; | |
407 | mqtt_status = STATUS_WAITING; | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
408 | mqtt_my_shutdown = true; |
0 | 409 | |
410 | do { | |
411 | if (mqtt_status == STATUS_WAITING) { | |
412 | if (debug) | |
413 | fprintf(stdout, (char *)"Waiting\n"); | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
414 | if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == false) { |
0 | 415 | mosquitto_disconnect(mosq); |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
416 | mqtt_disconnect_sent = true; |
0 | 417 | } |
418 | usleep(100000); | |
419 | } | |
420 | rc = MOSQ_ERR_SUCCESS; | |
421 | } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected); | |
422 | ||
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
423 | mosquitto_loop_stop(mosq, false); |
0 | 424 | mosquitto_destroy(mosq); |
425 | mosquitto_lib_cleanup(); | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
426 | mqtt_use = false; |
0 | 427 | mqtt_status = STATUS_CONNECTING; |
428 | mqtt_mid_sent = 0; | |
429 | mqtt_last_mid = -1; | |
430 | mqtt_last_mid_sent = -1; | |
747
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
431 | mqtt_connected = true; |
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
432 | mqtt_disconnect_sent = false; |
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
433 | mqtt_connect_lost = false; |
b6fbe6821468
Replace TRUE and FALSE for lowercase version.
Michiel Broek <mbroek@mbse.eu>
parents:
679
diff
changeset
|
434 | mqtt_my_shutdown = false; |
0 | 435 | syslog(LOG_NOTICE, "MQTT: disconnected"); |
436 | } | |
437 | } | |
438 | ||
439 |