9 |
9 |
10 static const char *TAG = "task_mqtt"; |
10 static const char *TAG = "task_mqtt"; |
11 |
11 |
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task |
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task |
13 SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. |
13 SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. |
14 uint64_t Sequence = 0; ///< Sequence stored in NVS |
|
15 nvs_handle_t seq_handle; ///< NVS handle |
|
16 int count_pub = 0; ///< Outstanding published messages. |
14 int count_pub = 0; ///< Outstanding published messages. |
17 esp_mqtt_client_handle_t client; ///< MQTT client handle |
15 esp_mqtt_client_handle_t client; ///< MQTT client handle |
18 |
16 |
19 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection |
17 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection |
20 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect |
18 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect |
58 * @brief Generate the mqtt payload header. |
56 * @brief Generate the mqtt payload header. |
59 * @return Allocated character string with the header. |
57 * @return Allocated character string with the header. |
60 */ |
58 */ |
61 char *payload_header(void) |
59 char *payload_header(void) |
62 { |
60 { |
63 char *tmp, buf[128]; |
61 char *tmp; |
64 esp_err_t err; |
62 |
65 |
63 tmp = xstrcpy((char *)"{\"metric\":"); |
66 tmp = xstrcpy((char *)"{\"seq\":"); |
|
67 sprintf(buf, "%lld", Sequence++); |
|
68 err = nvs_set_u64(seq_handle, "Sequence_cnt", Sequence); |
|
69 if (err != ESP_OK) |
|
70 ESP_LOGE(TAG, "Error %s write Sequence to NVS", esp_err_to_name(err)); |
|
71 tmp = xstrcat(tmp, buf); |
|
72 tmp = xstrcat(tmp, (char *)",\"metric\":"); |
|
73 return tmp; |
64 return tmp; |
74 } |
65 } |
75 |
66 |
76 |
67 |
77 |
68 |
361 esp_err_t err; |
352 esp_err_t err; |
362 char *uri = NULL, port[11]; |
353 char *uri = NULL, port[11]; |
363 |
354 |
364 ESP_LOGI(TAG, "Starting MQTT task"); |
355 ESP_LOGI(TAG, "Starting MQTT task"); |
365 xSemaphorePcounter = xSemaphoreCreateMutex(); |
356 xSemaphorePcounter = xSemaphoreCreateMutex(); |
366 |
|
367 /* |
|
368 * Initialize Sequence counter from NVS |
|
369 */ |
|
370 err = nvs_open("storage", NVS_READWRITE, &seq_handle); |
|
371 if (err != ESP_OK) { |
|
372 ESP_LOGE(TAG, "Error (%s) opening NVS handle", esp_err_to_name(err)); |
|
373 } else { |
|
374 err = nvs_get_u64(seq_handle, "Sequence_cnt", &Sequence); |
|
375 switch (err) { |
|
376 case ESP_OK: |
|
377 ESP_LOGI(TAG, "Sequence counter from NVS = %lld", Sequence); |
|
378 break; |
|
379 |
|
380 case ESP_ERR_NVS_NOT_FOUND: |
|
381 ESP_LOGI(TAG, "Sequence counter not found"); |
|
382 break; |
|
383 |
|
384 default: |
|
385 ESP_LOGE(TAG, "Error (%s) init Sequence", esp_err_to_name(err)); |
|
386 break; |
|
387 } |
|
388 } |
|
389 |
357 |
390 /* event handler and event group for the wifi driver */ |
358 /* event handler and event group for the wifi driver */ |
391 xEventGroupMQTT = xEventGroupCreate(); |
359 xEventGroupMQTT = xEventGroupCreate(); |
392 EventBits_t uxBits; |
360 EventBits_t uxBits; |
393 esp_mqtt_client_config_t mqtt_cfg = { |
361 esp_mqtt_client_config_t mqtt_cfg = { |
434 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); |
402 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); |
435 |
403 |
436 } else if (uxBits & TASK_MQTT_DISCONNECT) { |
404 } else if (uxBits & TASK_MQTT_DISCONNECT) { |
437 ESP_LOGI(TAG, "Request MQTT disconnect"); |
405 ESP_LOGI(TAG, "Request MQTT disconnect"); |
438 esp_mqtt_client_stop(client); |
406 esp_mqtt_client_stop(client); |
439 err = nvs_commit(seq_handle); |
|
440 if (err != ESP_OK) |
|
441 ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err)); |
|
442 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); |
407 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); |
443 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
408 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
444 } |
409 } |
445 } |
410 } |
446 } |
411 } |