thermferm/mqtt.c

changeset 499
602d9968960f
child 500
5aa914eb644e
equal deleted inserted replaced
498:4903b4da9d40 499:602d9968960f
1 /*****************************************************************************
2 * Copyright (C) 2016
3 *
4 * Michiel Broek <mbroek at mbse dot eu>
5 *
6 * This file is part of the mbsePi-apps
7 *
8 * This is free software; you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License as published by the
10 * Free Software Foundation; either version 2, or (at your option) any
11 * later version.
12 *
13 * mbsePi-apps is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with ThermFerm; see the file COPYING. If not, write to the Free
20 * Software Foundation, 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
21 *****************************************************************************/
22
23 #include "thermferm.h"
24 #include "xutil.h"
25 #include "mqtt.h"
26
27 extern sys_config Config;
28 extern int debug;
29
30 #ifdef HAVE_MOSQUITTO_H
31
32
33 /* Global variables for use in callbacks. */
34 int mqtt_qos = 0;
35 int mqtt_status = STATUS_CONNECTING;
36 int mqtt_mid_sent = 0;
37 int mqtt_last_mid = -1;
38 int mqtt_last_mid_sent = -1;
39 int mqtt_connected = TRUE;
40 int mqtt_disconnect_sent = FALSE;
41 int mqtt_connect_lost = FALSE;
42 int mqtt_my_shutdown = FALSE;
43 int mqtt_use = FALSE;
44 int keepalive = 60;
45 unsigned int max_inflight = 20;
46 struct mosquitto *mosq = NULL;
47 char *state = NULL;
48 char my_hostname[256];
49
50
51 void my_connect_callback(struct mosquitto *my_mosq, void *obj, int result)
52 {
53 if (mqtt_connect_lost) {
54 mqtt_connect_lost = FALSE;
55 syslog(LOG_NOTICE, "MQTT: reconnect: %s", mosquitto_connack_string(result));
56 }
57
58 if (!result) {
59 mqtt_status = STATUS_CONNACK_RECVD;
60 } else {
61 syslog(LOG_NOTICE, "MQTT: my_connect_callback: %s\n", mosquitto_connack_string(result));
62 }
63 }
64
65
66
67 void my_disconnect_callback(struct mosquitto *my_mosq, void *obj, int rc)
68 {
69 if (mqtt_my_shutdown) {
70 syslog(LOG_NOTICE, "MQTT: acknowledged DISCONNECT from %s", Config.mosq_host);
71 mqtt_connected = FALSE;
72 } else {
73 /*
74 * The remote server was brought down. We must keep running
75 */
76 syslog(LOG_NOTICE, "MQTT: received DISCONNECT from %s, connection lost", Config.mosq_host);
77 mqtt_connect_lost = TRUE;
78 }
79 }
80
81
82
83 void my_publish_callback(struct mosquitto *my_mosq, void *obj, int mid)
84 {
85 mqtt_last_mid_sent = mid;
86 }
87
88
89
90 void my_log_callback(struct mosquitto *my_mosq, void *obj, int level, const char *str)
91 {
92 syslog(LOG_NOTICE, "MQTT: %s", str);
93 if (debug)
94 fprintf(stdout, "MQTT: %s\n", str);
95 }
96
97
98
99 #endif
100
101
102 void mqtt_connect(void)
103 {
104 #ifdef HAVE_MOSQUITTO_H
105 char *id = NULL;
106 char err[1024];
107 int rc;
108
109 /*
110 * Initialize mosquitto communication
111 */
112 gethostname(my_hostname, 255);
113 mosquitto_lib_init();
114 id = xstrcpy((char *)"thermferm/");
115 id = xstrcat(id, my_hostname);
116 if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
117 /*
118 * Enforce maximum client id length of 23 characters
119 */
120 id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
121 }
122
123 mosq = mosquitto_new(id, TRUE, NULL);
124 if (!mosq) {
125 switch(errno) {
126 case ENOMEM:
127 syslog(LOG_NOTICE, "MQTT: mosquitto_new: Out of memory");
128 break;
129 case EINVAL:
130 syslog(LOG_NOTICE, "MQTT: mosquitto_new: Invalid id");
131 break;
132 }
133 mosquitto_lib_cleanup();
134 return;
135 }
136
137 if (debug) {
138 mosquitto_log_callback_set(mosq, my_log_callback);
139 }
140
141 /*
142 * Set our will
143 */
144 state = xstrcpy((char *)"clients/");
145 state = xstrcat(state, my_hostname);
146 state = xstrcat(state, (char *)"/thermferm/state");
147 if ((rc = mosquitto_will_set(mosq, state, 1, (char *)"0", mqtt_qos, TRUE))) {
148 if (rc == MOSQ_ERR_INVAL) {
149 syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: input parameters invalid");
150 } else if (rc == MOSQ_ERR_NOMEM) {
151 syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: Out of Memory");
152 } else if (rc == MOSQ_ERR_PAYLOAD_SIZE) {
153 syslog(LOG_NOTICE, "MQTT: mosquitto_will_set: invalid payload size");
154 }
155 mosquitto_lib_cleanup();
156 return;
157 }
158
159 mosquitto_max_inflight_messages_set(mosq, max_inflight);
160 mosquitto_connect_callback_set(mosq, my_connect_callback);
161 mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
162 mosquitto_publish_callback_set(mosq, my_publish_callback);
163
164 if ((rc = mosquitto_connect(mosq, Config.mosq_host, Config.mosq_port, keepalive))) {
165 if (rc == MOSQ_ERR_ERRNO) {
166 strerror_r(errno, err, 1024);
167 syslog(LOG_NOTICE, "MQTT: mosquitto_connect: error: %s", err);
168 } else {
169 syslog(LOG_NOTICE, "MQTT: mosquitto_connect: unable to connect (%d)", rc);
170 }
171 mosquitto_lib_cleanup();
172 syslog(LOG_NOTICE, "MQTT: will run without an MQTT broker.");
173 } else {
174 mqtt_use = TRUE;
175 syslog(LOG_NOTICE, "MQTT: connected with %s:%d", Config.mosq_host, Config.mosq_port);
176
177 /*
178 * Initialise is complete, report our presence state
179 */
180 mosquitto_loop_start(mosq);
181 mosquitto_publish(mosq, &mqtt_mid_sent, state, 1, (char *)"1", mqtt_qos, 1);
182 }
183 #endif
184 }
185
186
187
188 void mqtt_disconnect(void)
189 {
190 #ifdef HAVE_MOSQUITTO_H
191 int rc;
192 char buf[128];
193
194 if (mqtt_use) {
195 /*
196 * Final publish 0 to clients/<hostname>/thermferm/state
197 */
198 syslog(LOG_NOTICE, "MQTT disconnecting");
199 sprintf(buf, "0");
200 mosquitto_publish(mosq, &mqtt_mid_sent, state, strlen(buf), buf, mqtt_qos, true);
201 mqtt_last_mid = mqtt_mid_sent;
202 mqtt_status = STATUS_WAITING;
203 mqtt_my_shutdown = TRUE;
204
205 do {
206 if (mqtt_status == STATUS_WAITING) {
207 if (debug)
208 fprintf(stdout, (char *)"Waiting\n");
209 if (mqtt_last_mid_sent == mqtt_last_mid && mqtt_disconnect_sent == FALSE) {
210 mosquitto_disconnect(mosq);
211 mqtt_disconnect_sent = TRUE;
212 }
213 usleep(100000);
214 }
215 rc = MOSQ_ERR_SUCCESS;
216 } while (rc == MOSQ_ERR_SUCCESS && mqtt_connected);
217
218 mosquitto_loop_stop(mosq, FALSE);
219 mosquitto_destroy(mosq);
220 mosquitto_lib_cleanup();
221 syslog(LOG_NOTICE, "MQTT disconnected");
222 }
223 #endif
224 }
225
226
227
228 void mqtt_publish_int(char *uuid, char *tail, int value)
229 {
230 #ifdef HAVE_MOSQUITTO_H
231 char topic[1024], buf[128];
232
233 if (mqtt_use) {
234 snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
235 snprintf(buf, 127, "%d", value);
236 mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
237 }
238 #endif
239 }
240
241
242
243 void mqtt_publish_float(char *uuid, char *tail, float value, int decimals)
244 {
245 #ifdef HAVE_MOSQUITTO_H
246 char topic[1024], buf[128];
247
248 if (mqtt_use) {
249 snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
250 snprintf(buf, 127, "%.*f", decimals, value);
251 mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
252 }
253 #endif
254 }
255
256
257
258 void mqtt_publish_str(char *uuid, char *tail, char *value)
259 {
260 #ifdef HAVE_MOSQUITTO_H
261 char topic[1024], buf[128];
262
263 if (mqtt_use) {
264 snprintf(topic, 1023, "fermenter/%s/%s/%s", my_hostname, uuid, tail);
265 snprintf(buf, 127, "%s", value);
266 mosquitto_publish(mosq, &mqtt_mid_sent, topic, strlen(buf), buf, mqtt_qos, 1);
267 }
268 #endif
269 }
270

mercurial