main/task_mqtt.c

changeset 47
1ab1f4a8c328
parent 45
61a106fd9d9e
child 54
3b1834482899
equal deleted inserted replaced
46:2cf4b15895f2 47:1ab1f4a8c328
9 9
10 static const char *TAG = "task_mqtt"; 10 static const char *TAG = "task_mqtt";
11 11
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task 12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task
13 SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. 13 SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore.
14 uint64_t Sequence = 0; ///< Sequence stored in NVS
15 nvs_handle_t seq_handle; ///< NVS handle
16 int count_pub = 0; ///< Outstanding published messages. 14 int count_pub = 0; ///< Outstanding published messages.
17 esp_mqtt_client_handle_t client; ///< MQTT client handle 15 esp_mqtt_client_handle_t client; ///< MQTT client handle
18 16
19 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection 17 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection
20 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect 18 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect
58 * @brief Generate the mqtt payload header. 56 * @brief Generate the mqtt payload header.
59 * @return Allocated character string with the header. 57 * @return Allocated character string with the header.
60 */ 58 */
61 char *payload_header(void) 59 char *payload_header(void)
62 { 60 {
63 char *tmp, buf[128]; 61 char *tmp;
64 esp_err_t err; 62
65 63 tmp = xstrcpy((char *)"{\"metric\":");
66 tmp = xstrcpy((char *)"{\"seq\":");
67 sprintf(buf, "%lld", Sequence++);
68 err = nvs_set_u64(seq_handle, "Sequence_cnt", Sequence);
69 if (err != ESP_OK)
70 ESP_LOGE(TAG, "Error %s write Sequence to NVS", esp_err_to_name(err));
71 tmp = xstrcat(tmp, buf);
72 tmp = xstrcat(tmp, (char *)",\"metric\":");
73 return tmp; 64 return tmp;
74 } 65 }
75 66
76 67
77 68
361 esp_err_t err; 352 esp_err_t err;
362 char *uri = NULL, port[11]; 353 char *uri = NULL, port[11];
363 354
364 ESP_LOGI(TAG, "Starting MQTT task"); 355 ESP_LOGI(TAG, "Starting MQTT task");
365 xSemaphorePcounter = xSemaphoreCreateMutex(); 356 xSemaphorePcounter = xSemaphoreCreateMutex();
366
367 /*
368 * Initialize Sequence counter from NVS
369 */
370 err = nvs_open("storage", NVS_READWRITE, &seq_handle);
371 if (err != ESP_OK) {
372 ESP_LOGE(TAG, "Error (%s) opening NVS handle", esp_err_to_name(err));
373 } else {
374 err = nvs_get_u64(seq_handle, "Sequence_cnt", &Sequence);
375 switch (err) {
376 case ESP_OK:
377 ESP_LOGI(TAG, "Sequence counter from NVS = %lld", Sequence);
378 break;
379
380 case ESP_ERR_NVS_NOT_FOUND:
381 ESP_LOGI(TAG, "Sequence counter not found");
382 break;
383
384 default:
385 ESP_LOGE(TAG, "Error (%s) init Sequence", esp_err_to_name(err));
386 break;
387 }
388 }
389 357
390 /* event handler and event group for the wifi driver */ 358 /* event handler and event group for the wifi driver */
391 xEventGroupMQTT = xEventGroupCreate(); 359 xEventGroupMQTT = xEventGroupCreate();
392 EventBits_t uxBits; 360 EventBits_t uxBits;
393 esp_mqtt_client_config_t mqtt_cfg = { 361 esp_mqtt_client_config_t mqtt_cfg = {
434 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); 402 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
435 403
436 } else if (uxBits & TASK_MQTT_DISCONNECT) { 404 } else if (uxBits & TASK_MQTT_DISCONNECT) {
437 ESP_LOGI(TAG, "Request MQTT disconnect"); 405 ESP_LOGI(TAG, "Request MQTT disconnect");
438 esp_mqtt_client_stop(client); 406 esp_mqtt_client_stop(client);
439 err = nvs_commit(seq_handle);
440 if (err != ESP_OK)
441 ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err));
442 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); 407 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
443 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 408 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
444 } 409 }
445 } 410 }
446 } 411 }

mercurial