main/task_mqtt.c

changeset 77
15dc572a7fcb
parent 69
5437e0514d59
equal deleted inserted replaced
76:0432d9147682 77:15dc572a7fcb
15 esp_mqtt_client_handle_t client; ///< MQTT client handle 15 esp_mqtt_client_handle_t client; ///< MQTT client handle
16 16
17 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection 17 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection
18 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect 18 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect
19 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected 19 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected
20 const int TASK_MQTT_STARTED = BIT3; ///< MQTT is started
20 21
21 const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings 22 const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings
22 const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings 23 const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings
23 24
24 extern DS18B20_State *ds18b20_state; ///< DS18B20 state 25 extern DS18B20_State *ds18b20_state; ///< DS18B20 state
28 extern WIFI_State *wifi_state; ///< WiFi state 29 extern WIFI_State *wifi_state; ///< WiFi state
29 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore 30 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore
30 extern unit_t units[3]; 31 extern unit_t units[3];
31 extern SemaphoreHandle_t xSemaphoreUnits; 32 extern SemaphoreHandle_t xSemaphoreUnits;
32 extern const esp_app_desc_t *app_desc; 33 extern const esp_app_desc_t *app_desc;
33 extern strConfig_t config; 34 extern char hostname[];
34 35 extern char uuid[];
35 36
36 void connect_mqtt(bool state) 37 void connect_mqtt(bool state)
37 { 38 {
38 if (state) 39 if (state)
39 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT); 40 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
76 char *tmp; 77 char *tmp;
77 78
78 tmp = xstrcpy((char *)"mbv1.0/co2meters/"); 79 tmp = xstrcpy((char *)"mbv1.0/co2meters/");
79 tmp = xstrcat(tmp, msgtype); 80 tmp = xstrcat(tmp, msgtype);
80 tmp = xstrcat(tmp, (char *)"/"); 81 tmp = xstrcat(tmp, (char *)"/");
81 tmp = xstrcat(tmp, config.hostname); 82 tmp = xstrcat(tmp, hostname);
82 return tmp; 83 return tmp;
83 } 84 }
84 85
85 86
86 87
198 esp_chip_info_t chip_info; 199 esp_chip_info_t chip_info;
199 200
200 esp_chip_info(&chip_info); 201 esp_chip_info(&chip_info);
201 payload = payload_header(); 202 payload = payload_header();
202 payload = xstrcat(payload, (char *)"{\"uuid\":\""); 203 payload = xstrcat(payload, (char *)"{\"uuid\":\"");
203 payload = xstrcat(payload, config.uuid); 204 payload = xstrcat(payload, uuid);
204 payload = xstrcat(payload, (char *)"\",\"interval\":"); 205 payload = xstrcat(payload, (char *)"\",\"interval\":");
205 sprintf(buf, "%d", MAINLOOP_TIMER); 206 sprintf(buf, "%d", MAINLOOP_TIMER);
206 payload = xstrcat(payload, buf); 207 payload = xstrcat(payload, buf);
207 payload = xstrcat(payload, (char *)",\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\""); 208 payload = xstrcat(payload, (char *)",\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"");
208 sprintf(buf, "ESP32 %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision); 209 if (chip_info.model == CHIP_ESP32)
210 payload = xstrcat(payload, (char *)"ESP32");
211 else if (chip_info.model == CHIP_ESP32S2)
212 payload = xstrcat(payload, (char *)"ESP32-S2");
213 else if (chip_info.model == CHIP_ESP32S3)
214 payload = xstrcat(payload, (char *)"ESP32-S3");
215 else if (chip_info.model == CHIP_ESP32C3)
216 payload = xstrcat(payload, (char *)"ESP32-C3");
217 else if (chip_info.model == CHIP_ESP32C2)
218 payload = xstrcat(payload, (char *)"ESP32-C2");
219 else if (chip_info.model == CHIP_ESP32C6)
220 payload = xstrcat(payload, (char *)"ESP32-C6");
221 else if (chip_info.model == CHIP_ESP32H2)
222 payload = xstrcat(payload, (char *)"ESP32-H2");
223 else if (chip_info.model == CHIP_POSIX_LINUX)
224 payload = xstrcat(payload, (char *)"Posix Linux");
225 else
226 payload = xstrcat(payload, (char *)"Unknown");
227 sprintf(buf, " %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision);
209 payload = xstrcat(payload, buf); 228 payload = xstrcat(payload, buf);
210 payload = xstrcat(payload, (char *)"\",\"os\":\"esp-idf\",\"os_version\":\""); 229 payload = xstrcat(payload, (char *)"\",\"os\":\"esp-idf\",\"os_version\":\"");
211 payload = xstrcat(payload, (char *)esp_get_idf_version()); 230 payload = xstrcat(payload, (char *)esp_get_idf_version());
212 payload = xstrcat(payload, (char *)"\",\"FW\":\""); 231 payload = xstrcat(payload, (char *)"\",\"FW\":\"");
213 payload = xstrcat(payload, (char *)app_desc->version); 232 payload = xstrcat(payload, (char *)app_desc->version);
228 payload = xstrcat(payload, wifi_state->STA_ip); 247 payload = xstrcat(payload, wifi_state->STA_ip);
229 payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"ssid\":\""); 248 payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"ssid\":\"");
230 payload = xstrcat(payload, wifi_state->STA_ssid); 249 payload = xstrcat(payload, wifi_state->STA_ssid);
231 payload = xstrcat(payload, (char *)"\",\"rssi\":"); 250 payload = xstrcat(payload, (char *)"\",\"rssi\":");
232 sprintf(buf, "%d", wifi_state->STA_rssi); 251 sprintf(buf, "%d", wifi_state->STA_rssi);
252 payload = xstrcat(payload, buf);
253 payload = xstrcat(payload, (char *)",\"bssid\":\"");
254 payload = xstrcat(payload, wifi_state->STA_bssid);
255 payload = xstrcat(payload, (char *)"\",\"channel\":");
256 sprintf(buf, "%d", wifi_state->STA_channel);
233 payload = xstrcat(payload, buf); 257 payload = xstrcat(payload, buf);
234 payload = xstrcat(payload, (char *)"}"); 258 payload = xstrcat(payload, (char *)"}");
235 xSemaphoreGive(xSemaphoreWiFi); 259 xSemaphoreGive(xSemaphoreWiFi);
236 } else { 260 } else {
237 ESP_LOGE(TAG, "publishNode() lock WiFi error"); 261 ESP_LOGE(TAG, "publishNode() lock WiFi error");
287 static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) 311 static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
288 { 312 {
289 switch (event->event_id) { 313 switch (event->event_id) {
290 314
291 case MQTT_EVENT_CONNECTED: 315 case MQTT_EVENT_CONNECTED:
292 ESP_LOGD(TAG, "MQTT_EVENT_CONNECTED"); 316 ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
293 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 317 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
318 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_STARTED);
294 break; 319 break;
295 320
296 case MQTT_EVENT_DISCONNECTED: 321 case MQTT_EVENT_DISCONNECTED:
297 ESP_LOGD(TAG, "MQTT_EVENT_DISCONNECTED"); 322 ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
298 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 323 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
299 break; 324 break;
300 325
301 case MQTT_EVENT_SUBSCRIBED: 326 case MQTT_EVENT_SUBSCRIBED:
302 ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); 327 ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
303 break; 328 break;
304 329
305 case MQTT_EVENT_UNSUBSCRIBED: 330 case MQTT_EVENT_UNSUBSCRIBED:
306 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); 331 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
307 break; 332 break;
308 333
309 case MQTT_EVENT_PUBLISHED: 334 case MQTT_EVENT_PUBLISHED:
310 ESP_LOGD(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); 335 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
311 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { 336 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
312 if (count_pub) { 337 if (count_pub) {
313 count_pub--; 338 count_pub--;
314 } 339 }
315 xSemaphoreGive(xSemaphorePcounter); 340 xSemaphoreGive(xSemaphorePcounter);
376 while (1) { 401 while (1) {
377 402
378 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); 403 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );
379 404
380 if (uxBits & TASK_MQTT_CONNECT) { 405 if (uxBits & TASK_MQTT_CONNECT) {
381 if (strlen(config.mqtt_server)) { 406 if (! (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)) {
382 uri = xstrcpy((char *)"mqtt://"); 407 if (strlen(CONFIG_MQTT_SERVER)) {
383 if (strlen(config.mqtt_user) && strlen(config.mqtt_pwd)) { 408 uri = xstrcpy((char *)"mqtt://");
384 uri = xstrcat(uri, config.mqtt_user); 409 if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) {
385 uri = xstrcat(uri, (char *)":"); 410 uri = xstrcat(uri, CONFIG_MQTT_USER);
386 uri = xstrcat(uri, config.mqtt_pwd); 411 uri = xstrcat(uri, (char *)":");
387 uri = xstrcat(uri, (char *)"@"); 412 uri = xstrcat(uri, CONFIG_MQTT_PASS);
413 uri = xstrcat(uri, (char *)"@");
414 }
415 uri = xstrcat(uri, CONFIG_MQTT_SERVER);
416 if (CONFIG_MQTT_PORT != 1883) {
417 uri = xstrcat(uri, (char *)":");
418 sprintf(port, "%d", CONFIG_MQTT_PORT);
419 uri = xstrcat(uri, port);
420 }
421 } else {
422 uri = xstrcpy((char *)"mqtt://iot.eclipse.org:1883");
388 } 423 }
389 uri = xstrcat(uri, config.mqtt_server); 424
390 if (config.mqtt_port != 1883) { 425 ESP_LOGI(TAG, "Request MQTT connect %s", uri);
391 uri = xstrcat(uri, (char *)":"); 426 err = esp_mqtt_client_set_uri(client, uri);
392 sprintf(port, "%d", config.mqtt_port); 427 if (err != ESP_OK)
393 uri = xstrcat(uri, port); 428 ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
429
430 if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_STARTED) {
431 /* Existing session */
432 err = esp_mqtt_client_reconnect(client);
433 if (err != ESP_OK)
434 ESP_LOGE(TAG, "Reconnect result %s", esp_err_to_name(err));
435 } else {
436 /* New session */
437 err = esp_mqtt_client_start(client);
438 if (err != ESP_OK)
439 ESP_LOGE(TAG, "Start result %s", esp_err_to_name(err));
394 } 440 }
441 if (uri)
442 free(uri);
443 uri = NULL;
395 } else { 444 } else {
396 uri = xstrcpy((char *)"mqtt://iot.eclipse.org:1883"); 445 ESP_LOGI(TAG, "Request MQTT connect but already connected.");
397 } 446 }
398 ESP_LOGI(TAG, "Request MQTT connect %s", uri);
399 err = esp_mqtt_client_set_uri(client, uri);
400 if (err != ESP_OK)
401 ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
402 err = esp_mqtt_client_start(client);
403 if (err != ESP_OK)
404 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
405 if (uri)
406 free(uri);
407 uri = NULL;
408 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); 447 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
409 448
410 } else if (uxBits & TASK_MQTT_DISCONNECT) { 449 } else if (uxBits & TASK_MQTT_DISCONNECT) {
411 ESP_LOGI(TAG, "Request MQTT disconnect"); 450 ESP_LOGI(TAG, "Request MQTT disconnect");
412 esp_mqtt_client_stop(client); 451 esp_mqtt_client_stop(client);
413 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); 452 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
414 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 453 // xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
454 ESP_LOGI(TAG, "Request MQTT disconnect done");
415 } 455 }
416 } 456 }
417 } 457 }
418 458

mercurial