Experimental mqtt published messages state counter

Sun, 13 Oct 2019 12:24:14 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Sun, 13 Oct 2019 12:24:14 +0200
changeset 11
e33f2d325d15
parent 10
d08c7466bb40
child 12
7dc9003f86a8

Experimental mqtt published messages state counter

main/co2meter.c file | annotate | diff | comparison | revisions
main/task_mqtt.c file | annotate | diff | comparison | revisions
--- a/main/co2meter.c	Sat Oct 12 21:05:09 2019 +0200
+++ b/main/co2meter.c	Sun Oct 13 12:24:14 2019 +0200
@@ -38,6 +38,7 @@
 extern SemaphoreHandle_t        	xSemaphoreDS18B20;      	///< DS18B20 lock semaphore
 extern ADC_State                	*adc_state;             	///< ADC state
 extern SemaphoreHandle_t        	xSemaphoreADC;          	///< ADC lock semaphore
+extern int				count_pub;
 
 
 void app_main()
@@ -328,16 +329,17 @@
 		    publishNode();
 		    publishUnits();
 		    publishLogs();
-
-Main_Loop1 = MAIN_LOOP1_MQTT_DISCONNECT;
+		    Main_Loop1 = MAIN_LOOP1_WAITACK;
 		    break;
 
 		case MAIN_LOOP1_WAITACK:
+		    if (count_pub == 0) // Wait until all published messages are sent.
+			Main_Loop1 = MAIN_LOOP1_MQTT_DISCONNECT;
 		    break;
 
 		case MAIN_LOOP1_MQTT_DISCONNECT:
 		    ESP_LOGI(TAG, "Loop timer: Disconnect MQTT");
-		    connect_mqtt(false);
+		    connect_mqtt(false); // Doesn't really disconnect.
                     Main_Loop1 = MAIN_LOOP1_DISCONNECT;
 		    break;
 
--- a/main/task_mqtt.c	Sat Oct 12 21:05:09 2019 +0200
+++ b/main/task_mqtt.c	Sun Oct 13 12:24:14 2019 +0200
@@ -10,11 +10,12 @@
 static const char		*TAG = "task_mqtt";
 
 EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task
-
+SemaphoreHandle_t		xSemaphorePcounter;	///< Publish counter semaphore.
 uint64_t			Sequence = 0;		///< Sequence stored in NVS
 nvs_handle_t			seq_handle;		///< NVS handle
+int				count_pub = 0;		///< Outstanding published messages.
+esp_mqtt_client_handle_t	client;			///< MQTT client handle
 
-esp_mqtt_client_handle_t	client;			///< MQTT client handle
 const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
 const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
 const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected
@@ -28,10 +29,8 @@
 extern SemaphoreHandle_t	xSemaphoreADC;		///< ADC lock semaphore
 extern WIFI_State		*wifi_state;		///< WiFi state
 extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore
-
 extern unit_t			units[3];
 extern SemaphoreHandle_t	xSemaphoreUnits;
-
 extern const esp_app_desc_t	*app_desc;
 
 
@@ -92,6 +91,13 @@
         esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
     else
 	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
+    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
+    	count_pub++;
+printf("  up %d\n", count_pub);
+	xSemaphoreGive(xSemaphorePcounter);
+    } else {
+	ESP_LOGE(TAG, "Missed lock 1");
+    }
 }
 
 
@@ -183,10 +189,7 @@
     payload = xstrcat(payload, config.uuid);
     payload = xstrcat(payload, (char *)"\",");
     payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"");
-    sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores,
-		(chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "", 
-		(chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "",
-		chip_info.revision);
+    sprintf(buf, "ESP32 %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision);
     payload = xstrcat(payload, buf);
     payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\"");
     payload = xstrcat(payload, (char *)app_desc->version);
@@ -279,6 +282,15 @@
 
         case MQTT_EVENT_PUBLISHED:
             ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
+	    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
+	    	if (count_pub) {
+		    count_pub--;
+printf("down %d\n", count_pub);
+	    	}
+		xSemaphoreGive(xSemaphorePcounter);
+	    } else {
+        	ESP_LOGE(TAG, "Missed lock 2");
+	    }
             break;
 
         case MQTT_EVENT_DATA:
@@ -306,13 +318,11 @@
 
 
 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
-//    ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
     mqtt_event_handler_cb(event_data);
 }
 
 
 
-
 /*
  * Task to read temperature sensors on request.
  */
@@ -321,6 +331,7 @@
     esp_err_t	err;
 
     ESP_LOGI(TAG, "Starting MQTT task");
+    xSemaphorePcounter = xSemaphoreCreateMutex();
 
     /*
      * Initialize Sequence counter from NVS
@@ -351,7 +362,7 @@
     esp_mqtt_client_config_t mqtt_cfg = {
         .uri = "mqtt://seaport.mbse.ym",
     };
-    /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg);
+    client = esp_mqtt_client_init(&mqtt_cfg);
     esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
 
     /*
@@ -364,22 +375,19 @@
 	if (uxBits & TASK_MQTT_CONNECT) {
 	    ESP_LOGI(TAG, "Request MQTT connect");
 	    err = esp_mqtt_client_start(client);
-	    ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
+	    if (err != ESP_OK)
+	    	ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
 	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
 
 	} else if (uxBits & TASK_MQTT_DISCONNECT) {
 	    ESP_LOGI(TAG, "Request MQTT disconnect");
-	    // publish disconnect messages
-	    err = esp_mqtt_client_stop(client);
-	    ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
-
+	    esp_mqtt_client_stop(client);
 	    err = nvs_commit(seq_handle);
 	    if (err != ESP_OK)
 		ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err));
 	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
 	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
 	}
-	vTaskDelay( (TickType_t)10);
     }
 }
 

mercurial