# HG changeset patch # User Michiel Broek # Date 1570962254 -7200 # Node ID e33f2d325d153220462bfaae1b52b7a21afb9d94 # Parent d08c7466bb407a540216b539eee2466b8d6d3338 Experimental mqtt published messages state counter diff -r d08c7466bb40 -r e33f2d325d15 main/co2meter.c --- a/main/co2meter.c Sat Oct 12 21:05:09 2019 +0200 +++ b/main/co2meter.c Sun Oct 13 12:24:14 2019 +0200 @@ -38,6 +38,7 @@ extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore extern ADC_State *adc_state; ///< ADC state extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore +extern int count_pub; void app_main() @@ -328,16 +329,17 @@ publishNode(); publishUnits(); publishLogs(); - -Main_Loop1 = MAIN_LOOP1_MQTT_DISCONNECT; + Main_Loop1 = MAIN_LOOP1_WAITACK; break; case MAIN_LOOP1_WAITACK: + if (count_pub == 0) // Wait until all published messages are sent. + Main_Loop1 = MAIN_LOOP1_MQTT_DISCONNECT; break; case MAIN_LOOP1_MQTT_DISCONNECT: ESP_LOGI(TAG, "Loop timer: Disconnect MQTT"); - connect_mqtt(false); + connect_mqtt(false); // Doesn't really disconnect. Main_Loop1 = MAIN_LOOP1_DISCONNECT; break; diff -r d08c7466bb40 -r e33f2d325d15 main/task_mqtt.c --- a/main/task_mqtt.c Sat Oct 12 21:05:09 2019 +0200 +++ b/main/task_mqtt.c Sun Oct 13 12:24:14 2019 +0200 @@ -10,11 +10,12 @@ static const char *TAG = "task_mqtt"; EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task - +SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. uint64_t Sequence = 0; ///< Sequence stored in NVS nvs_handle_t seq_handle; ///< NVS handle +int count_pub = 0; ///< Outstanding published messages. +esp_mqtt_client_handle_t client; ///< MQTT client handle -esp_mqtt_client_handle_t client; ///< MQTT client handle const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected @@ -28,10 +29,8 @@ extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore extern WIFI_State *wifi_state; ///< WiFi state extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore - extern unit_t units[3]; extern SemaphoreHandle_t xSemaphoreUnits; - extern const esp_app_desc_t *app_desc; @@ -92,6 +91,13 @@ esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); else esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); + if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { + count_pub++; +printf(" up %d\n", count_pub); + xSemaphoreGive(xSemaphorePcounter); + } else { + ESP_LOGE(TAG, "Missed lock 1"); + } } @@ -183,10 +189,7 @@ payload = xstrcat(payload, config.uuid); payload = xstrcat(payload, (char *)"\","); payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\""); - sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores, - (chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "", - (chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "", - chip_info.revision); + sprintf(buf, "ESP32 %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\""); payload = xstrcat(payload, (char *)app_desc->version); @@ -279,6 +282,15 @@ case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); + if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { + if (count_pub) { + count_pub--; +printf("down %d\n", count_pub); + } + xSemaphoreGive(xSemaphorePcounter); + } else { + ESP_LOGE(TAG, "Missed lock 2"); + } break; case MQTT_EVENT_DATA: @@ -306,13 +318,11 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { -// ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); mqtt_event_handler_cb(event_data); } - /* * Task to read temperature sensors on request. */ @@ -321,6 +331,7 @@ esp_err_t err; ESP_LOGI(TAG, "Starting MQTT task"); + xSemaphorePcounter = xSemaphoreCreateMutex(); /* * Initialize Sequence counter from NVS @@ -351,7 +362,7 @@ esp_mqtt_client_config_t mqtt_cfg = { .uri = "mqtt://seaport.mbse.ym", }; - /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg); + client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); /* @@ -364,22 +375,19 @@ if (uxBits & TASK_MQTT_CONNECT) { ESP_LOGI(TAG, "Request MQTT connect"); err = esp_mqtt_client_start(client); - ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); + if (err != ESP_OK) + ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); } else if (uxBits & TASK_MQTT_DISCONNECT) { ESP_LOGI(TAG, "Request MQTT disconnect"); - // publish disconnect messages - err = esp_mqtt_client_stop(client); - ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); - + esp_mqtt_client_stop(client); err = nvs_commit(seq_handle); if (err != ESP_OK) ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err)); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); } - vTaskDelay( (TickType_t)10); } }