main/task_mqtt.c

changeset 7
2b337dd92f25
parent 6
bad3414f7bc4
child 9
1659bd3c7a2b
equal deleted inserted replaced
6:bad3414f7bc4 7:2b337dd92f25
15 esp_mqtt_client_handle_t client; ///< MQTT client handle 15 esp_mqtt_client_handle_t client; ///< MQTT client handle
16 16
17 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection 17 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection
18 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect 18 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect
19 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected 19 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected
20 const int TASK_MQTT_DISCONNECTED = BIT3;
20 21
21 const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings 22 const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings
22 const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings 23 const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings
23 24
24 extern BMP280_State *bmp280_state; ///< BMP280 state 25 extern BMP280_State *bmp280_state; ///< BMP280 state
56 return false; 57 return false;
57 } 58 }
58 59
59 60
60 61
62 void wait_mqtt(int time)
63 {
64 EventBits_t uxBits;
65
66 ESP_LOGI(TAG, "wait_mqtt(%d) 1", time);
67 if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) {
68 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, time / portTICK_PERIOD_MS);
69 ESP_LOGI(TAG, "wait_mqtt(%d) 2 %lu", time, uxBits & TASK_MQTT_DISCONNECTED);
70 } else {
71 ESP_LOGI(TAG, "wait_mqtt(%d) 3 not connected", time);
72 }
73 }
74
75
76
61 /** 77 /**
62 * @brief Generate the mqtt payload header. 78 * @brief Generate the mqtt payload header.
63 * @return Allocated character string with the header. 79 * @return Allocated character string with the header.
64 */ 80 */
65 char *payload_header(void) 81 char *payload_header(void)
117 133
118 134
119 135
120 void publish(void) 136 void publish(void)
121 { 137 {
122 char *topic = NULL, *payload = NULL, buf[64]; 138 char *topic = NULL, *payload = NULL, buf[64];
139 const esp_app_desc_t *app_desc = esp_app_get_description();
123 140
124 // {"system":{"battery":70,"alarm":0,"version":"0.2.6","rssi":-56,"wifi":88,"light":{"lux":12.34,"gain":2}},"solar":{"voltage":13.98,"current":234.1,"power":3.272718},"battery":{"voltage":13.21,"current":4.942289,"power":0.065288},"real":{"current":229.1577},"TH":{"temperature":20.2,"humidity":48.3},"output":{"relay1":0,"relay2":0,"dimmer3":0,"dimmer4":0}} 141 // {"system":{"battery":70,"alarm":0,"version":"0.2.6","rssi":-56,"wifi":88,"light":{"lux":12.34,"gain":2}},"solar":{"voltage":13.98,"current":234.1,"power":3.272718},"battery":{"voltage":13.21,"current":4.942289,"power":0.065288},"real":{"current":229.1577},"TH":{"temperature":20.2,"humidity":48.3},"output":{"relay1":0,"relay2":0,"dimmer3":0,"dimmer4":0}}
125 // 142 //
126 payload = payload_header(); 143 payload = payload_header();
127 payload = xstrcat(payload, (char *)"{\"system\":{\"battery\":"); 144 payload = xstrcat(payload, (char *)"{\"system\":{\"battery\":");
130 payload = xstrcat(payload, (char *)",\"alarm\":"); 147 payload = xstrcat(payload, (char *)",\"alarm\":");
131 sprintf(buf, "%ld", Alarm); 148 sprintf(buf, "%ld", Alarm);
132 payload = xstrcat(payload, buf); 149 payload = xstrcat(payload, buf);
133 payload = xstrcat(payload, (char *)",\"version\":\""); 150 payload = xstrcat(payload, (char *)",\"version\":\"");
134 payload = xstrcat(payload, (char *)app_desc->version); 151 payload = xstrcat(payload, (char *)app_desc->version);
135 payload = xstrcat(payload, (char *)",\"rssi\":"); 152 payload = xstrcat(payload, (char *)"\",\"rssi\":");
136 153 if (xSemaphoreTake(xSemaphoreWiFi, 25) == pdTRUE) {
154 sprintf(buf, "%d", wifi_state->STA_rssi);
155 payload = xstrcat(payload, buf);
156 xSemaphoreGive(xSemaphoreWiFi);
157 }
137 payload = xstrcat(payload, (char *)",\"light\":{\"lux\":"); 158 payload = xstrcat(payload, (char *)",\"light\":{\"lux\":");
138 159
139 payload = xstrcat(payload, (char *)",\"gain\":"); 160 payload = xstrcat(payload, (char *)",\"gain\":");
140 161
141 payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":"); 162 payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":");
155 payload = xstrcat(payload, buf); 176 payload = xstrcat(payload, buf);
156 payload = xstrcat(payload, (char *)",\"power\":"); 177 payload = xstrcat(payload, (char *)",\"power\":");
157 sprintf(buf, "%.3f", batteryPower / 1000.0); 178 sprintf(buf, "%.3f", batteryPower / 1000.0);
158 payload = xstrcat(payload, buf); 179 payload = xstrcat(payload, buf);
159 payload = xstrcat(payload, (char *)"},\"real\":{\"current\":"); 180 payload = xstrcat(payload, (char *)"},\"real\":{\"current\":");
181 sprintf(buf, "%.1f", (0 - batteryCurrent) + solarCurrent);
182 payload = xstrcat(payload, buf);
160 payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":"); 183 payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":");
161 if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) { 184 if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) {
162 sprintf(buf, "%.2f", bmp280_state->temperature); 185 sprintf(buf, "%.2f", bmp280_state->temperature);
163 payload = xstrcat(payload, buf); 186 payload = xstrcat(payload, buf);
164 payload = xstrcat(payload, (char *)",\"pressure\":"); 187 payload = xstrcat(payload, (char *)",\"pressure\":");
175 payload = xstrcat(payload, (char *)",\"dimmer4\":"); 198 payload = xstrcat(payload, (char *)",\"dimmer4\":");
176 199
177 payload = xstrcat(payload, (char *)"}}"); 200 payload = xstrcat(payload, (char *)"}}");
178 topic = topic_base(); 201 topic = topic_base();
179 topic = xstrcat(topic, (char *)"status"); 202 topic = xstrcat(topic, (char *)"status");
180 publisher(topic, payload); 203 ESP_LOGI(TAG, "%s %s", topic, payload);
204 // publisher(topic, payload);
181 free(topic); 205 free(topic);
182 topic = NULL; 206 topic = NULL;
183 free(payload); 207 free(payload);
184 payload = NULL; 208 payload = NULL;
185 } 209 }
195 char *topic = topic_base(); 219 char *topic = topic_base();
196 topic = xstrcat(topic, (char *)"output/set/#"); 220 topic = xstrcat(topic, (char *)"output/set/#");
197 ESP_LOGI(TAG, "Subscribe `%s' id %d", topic, esp_mqtt_client_subscribe(client, topic, 0)); 221 ESP_LOGI(TAG, "Subscribe `%s' id %d", topic, esp_mqtt_client_subscribe(client, topic, 0));
198 free(topic); 222 free(topic);
199 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 223 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
224 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
200 break; 225 break;
201 226
202 case MQTT_EVENT_DISCONNECTED: 227 case MQTT_EVENT_DISCONNECTED:
203 ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); 228 ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
204 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 229 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
230 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
205 break; 231 break;
206 232
207 case MQTT_EVENT_SUBSCRIBED: 233 case MQTT_EVENT_SUBSCRIBED:
208 ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); 234 ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
209 break; 235 break;
266 xSemaphorePcounter = xSemaphoreCreateMutex(); 292 xSemaphorePcounter = xSemaphoreCreateMutex();
267 293
268 /* event handler and event group for the wifi driver */ 294 /* event handler and event group for the wifi driver */
269 xEventGroupMQTT = xEventGroupCreate(); 295 xEventGroupMQTT = xEventGroupCreate();
270 EventBits_t uxBits; 296 EventBits_t uxBits;
297 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
271 298
272 esp_mqtt_client_config_t mqtt_cfg = { 299 esp_mqtt_client_config_t mqtt_cfg = {
273 .broker.address.uri = "mqtt://localhost", 300 .broker.address.uri = "mqtt://localhost",
274 }; 301 };
275 client = esp_mqtt_client_init(&mqtt_cfg); 302 client = esp_mqtt_client_init(&mqtt_cfg);
281 while (1) { 308 while (1) {
282 309
283 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); 310 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );
284 311
285 if (uxBits & TASK_MQTT_CONNECT) { 312 if (uxBits & TASK_MQTT_CONNECT) {
313 /*
314 * First build the connect uri.
315 */
286 uri = xstrcpy((char *)"mqtt://"); 316 uri = xstrcpy((char *)"mqtt://");
287 if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) { 317 if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) {
288 uri = xstrcat(uri, CONFIG_MQTT_USER); 318 uri = xstrcat(uri, CONFIG_MQTT_USER);
289 uri = xstrcat(uri, (char *)":"); 319 uri = xstrcat(uri, (char *)":");
290 uri = xstrcat(uri, CONFIG_MQTT_PASS); 320 uri = xstrcat(uri, CONFIG_MQTT_PASS);
294 if (CONFIG_MQTT_PORT != 1883) { 324 if (CONFIG_MQTT_PORT != 1883) {
295 uri = xstrcat(uri, (char *)":"); 325 uri = xstrcat(uri, (char *)":");
296 sprintf(port, "%d", CONFIG_MQTT_PORT); 326 sprintf(port, "%d", CONFIG_MQTT_PORT);
297 uri = xstrcat(uri, port); 327 uri = xstrcat(uri, port);
298 } 328 }
299
300 ESP_LOGI(TAG, "Request MQTT connect %s", uri); 329 ESP_LOGI(TAG, "Request MQTT connect %s", uri);
330
331 /*
332 * Connect to the broker.
333 */
301 err = esp_mqtt_client_set_uri(client, uri); 334 err = esp_mqtt_client_set_uri(client, uri);
302 if (err != ESP_OK) 335 if (err != ESP_OK)
303 ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err)); 336 ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
304 err = esp_mqtt_client_start(client); 337 err = esp_mqtt_client_start(client);
305 if (err != ESP_OK) { 338 if (err != ESP_OK) {
307 } 340 }
308 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); 341 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
309 342
310 } else if (uxBits & TASK_MQTT_DISCONNECT) { 343 } else if (uxBits & TASK_MQTT_DISCONNECT) {
311 ESP_LOGI(TAG, "Request MQTT disconnect"); 344 ESP_LOGI(TAG, "Request MQTT disconnect");
345 /*
346 * Unsubscribe if connected
347 */
312 if (ready_mqtt()) { 348 if (ready_mqtt()) {
313 char *topic = topic_base(); 349 char *topic = topic_base();
314 topic = xstrcat(topic, (char *)"output/set/#"); 350 topic = xstrcat(topic, (char *)"output/set/#");
315 esp_mqtt_client_unsubscribe(client, topic); 351 esp_mqtt_client_unsubscribe(client, topic);
316 free(topic); 352 free(topic);
317 } 353 }
318 esp_mqtt_client_stop(client); 354
355 /*
356 * Disconnect from broker and wait until confirmed.
357 */
358 err = esp_mqtt_client_disconnect(client);
359 if (err != ESP_OK) {
360 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
361 }
362 xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, 500 / portTICK_PERIOD_MS);
363 ESP_LOGI(TAG, "disconnect confirmed");
364
365 /*
366 * Finally stop the client because new connections start
367 * with a 'esp_mqtt_client_start()' command.
368 * This will take about 5 seconds, but we don't need the network.
369 */
370 err = esp_mqtt_client_stop(client);
371 if (err != ESP_OK) {
372 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
373 } else {
374 ESP_LOGI(TAG, "stopped");
375 }
376
319 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); 377 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
320 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
321 } 378 }
322 } 379 }
323 } 380 }
324 381

mercurial