main/task_mqtt.c

changeset 0
88d965579617
child 1
1082183cd6bb
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/main/task_mqtt.c	Tue Oct 08 12:00:31 2019 +0200
@@ -0,0 +1,359 @@
+/**
+ * @file task_mqtt.c
+ * @brief The FreeRTOS task to maintain MQTT connections.
+ */
+
+
+#include "config.h"
+
+
+static const char		*TAG = "task_mqtt";
+
+EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task
+
+uint64_t			Sequence = 0;		///< Sequence stored in NVS
+nvs_handle_t			seq_handle;		///< NVS handle
+
+esp_mqtt_client_handle_t	client;			///< MQTT client handle
+const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
+const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
+const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected
+
+const char			*sensState[] = { "Ok", "Error" };
+const char			*unitMode[] = { "Off", "On" };
+
+extern DS18B20_State            *ds18b20_state;         ///< DS18B20 state
+extern SemaphoreHandle_t        xSemaphoreDS18B20;      ///< DS18B20 lock semaphore
+extern ADC_State		*adc_state;		///< ADC state
+extern SemaphoreHandle_t	xSemaphoreADC;		///< ADC lock semaphore
+extern WIFI_State		*wifi_state;		///< WiFi state
+extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore
+
+extern unit_t			units[3];
+extern const esp_app_desc_t	*app_desc;
+
+
+
+void connect_mqtt(bool state)
+{
+    if (state)
+    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
+    else
+    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
+}
+
+
+
+bool ready_mqtt(void)
+{
+    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
+        return true;
+    return false;
+}
+
+
+
+char *payload_header(void)
+{
+    char        *tmp, buf[128];
+    esp_err_t   err;
+
+    tmp = xstrcpy((char *)"{\"seq\":");
+    sprintf(buf, "%lld", Sequence++);
+    err = nvs_set_u64(seq_handle, "Sequence_cnt", Sequence);
+    if (err != ESP_OK)
+	ESP_LOGE(TAG, "Error %s write Sequence to NVS", esp_err_to_name(err));
+    tmp = xstrcat(tmp, buf);
+    tmp = xstrcat(tmp, (char *)",\"metric\":");
+    return tmp;
+}
+
+
+
+char *topic_base(char *msgtype)
+{
+    char        *tmp;
+
+    tmp = xstrcpy((char *)"mbv1.0/pressure/");
+    tmp = xstrcat(tmp, msgtype);
+    tmp = xstrcat(tmp, (char *)"/");
+    tmp = xstrcat(tmp, config.hostname);
+    return tmp;
+}
+
+
+
+void publisher(char *topic, char *payload)
+{
+    // publish the data
+    if (payload)
+        esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
+    else
+	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
+}
+
+
+
+char *unit_data(int i)
+{
+    char	*payload = NULL;
+    char        buf[128];
+
+    payload = xstrcpy((char *)"{\"uuid\":\"");
+    payload = xstrcat(payload, units[i].uuid);
+    payload = xstrcat(payload, (char *)"\",\"alias\":\"");
+    payload = xstrcat(payload, units[i].alias);
+
+    // temperature_state temperature_address temperature
+    payload = xstrcat(payload, (char *)"\",\"temperature\":{\"state\":\"");
+    payload = xstrcat(payload, (char *)sensState[units[i].temperature_state]);
+    payload = xstrcat(payload, (char *)"\",\"address\":\"");
+    payload = xstrcat(payload, (char *)units[i].temperature_address);
+    payload = xstrcat(payload, (char *)"\",\"temperature\":");
+    sprintf(buf, "%.3f", units[i].temperature / 1000.0);
+    payload = xstrcat(payload, buf);
+
+    // pressure_state pressure_channel pressure_voltage pressure_zero pressure
+    payload = xstrcat(payload, (char *)"},\"pressure\":{\"state\":\"");
+    payload = xstrcat(payload, (char *)sensState[units[i].pressure_state]);
+    payload = xstrcat(payload, (char *)"\",\"channel\":");
+    sprintf(buf, "%d", units[i].pressure_channel);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"voltage\":");
+    sprintf(buf, "%.3f", units[i].pressure_voltage / 1000.0);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"zero\":");
+    sprintf(buf, "%.3f", units[i].pressure_zero / 1000.0);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"bar\":");
+    sprintf(buf, "%.3f", units[i].pressure / 1000.0);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)"},\"mode\":\"");
+    payload = xstrcat(payload, (char *)unitMode[units[i].mode]);
+    payload = xstrcat(payload, (char *)"\"}");
+    return payload;
+}
+
+
+
+void publishUnits(void)
+{
+    char        *topic = NULL, *payload = NULL, *payloadu = NULL;
+    int		i;
+    bool	comma = false;
+
+    payload = payload_header();
+    payload = xstrcat(payload, (char *)"{\"units\":[");
+    for (i = 0; i < 3; i++) {
+	    if (comma)
+                payload = xstrcat(payload, (char *)",");
+            payloadu = unit_data(i);
+            payload = xstrcat(payload, payloadu);
+            comma = true;
+            free(payloadu);
+            payloadu = NULL;
+    }
+    payload = xstrcat(payload, (char *)"]}}");
+    topic = topic_base((char *)"DBIRTH");
+    publisher(topic, payload);
+    free(topic);
+    topic = NULL;
+    free(payload);
+    payload = NULL;
+}
+
+
+
+void publishNode(void)
+{
+    char                *topic = NULL, *payload = NULL, buf[64];
+    esp_chip_info_t	chip_info;
+
+    esp_chip_info(&chip_info);
+    payload = payload_header();
+    payload = xstrcat(payload, (char *)"{\"uuid\":\"");
+    payload = xstrcat(payload, config.uuid);
+    payload = xstrcat(payload, (char *)"\",");
+    payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"");
+    sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores,
+		(chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "", 
+		(chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "",
+		chip_info.revision);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\"");
+    payload = xstrcat(payload, (char *)app_desc->version);
+    payload = xstrcat(payload, (char *)"\"}");
+
+    if (xSemaphoreTake(xSemaphoreDS18B20, 10) == pdTRUE) {
+        payload = xstrcat(payload, (char *)",\"THB\":{\"temperature\":");
+        sprintf(buf, "%.3f", ds18b20_state->bottle_temperature);
+        payload = xstrcat(payload, buf);
+        payload = xstrcat(payload, (char *)"}");
+	xSemaphoreGive(xSemaphoreDS18B20);
+    }
+
+    if (xSemaphoreTake(xSemaphoreWiFi, 10) == pdTRUE) {
+	payload = xstrcat(payload, (char *)",\"net\":{\"address\":\"");
+	payload = xstrcat(payload, wifi_state->STA_ip);
+        payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"rssi\":");
+	sprintf(buf, "%d", wifi_state->STA_rssi);
+	payload = xstrcat(payload, buf);
+        payload = xstrcat(payload, (char *)"}");
+	xSemaphoreGive(xSemaphoreWiFi);
+    }
+
+    payload = xstrcat(payload, (char *)"}}");
+    // Only NBIRTH messages, no NDATA in this project.
+    topic = topic_base((char *)"NBIRTH");
+    publisher(topic, payload);
+    free(topic);
+    topic = NULL;
+    free(payload);
+    payload = NULL;
+}
+
+
+
+static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
+{
+    switch (event->event_id) {
+
+        case MQTT_EVENT_CONNECTED:
+            ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
+	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
+
+//	    topic = topic_base((char *)"NCMD");
+//            topic = xstrcat(topic, (char *)"/#");
+//	    ESP_LOGI(TAG, "Subscribe %s", topic);
+//            msg_id = esp_mqtt_client_subscribe(client, topic, 0);
+//            ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
+//	    free(topic);
+
+//	    topic = topic_base((char *)"DCMD");
+//            topic = xstrcat(topic, (char *)"/#");
+//	    ESP_LOGI(TAG, "Subscribe %s", topic);
+//            msg_id = esp_mqtt_client_subscribe(client, topic, 1);
+//            ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
+//	    free(topic);
+//	    topic = NULL;
+            break;
+
+        case MQTT_EVENT_DISCONNECTED:
+            ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
+	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
+            break;
+
+        case MQTT_EVENT_SUBSCRIBED:
+            ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
+            break;
+
+        case MQTT_EVENT_UNSUBSCRIBED:
+            ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
+            break;
+
+        case MQTT_EVENT_PUBLISHED:
+            ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
+            break;
+
+        case MQTT_EVENT_DATA:
+            ESP_LOGI(TAG, "MQTT_EVENT_DATA");
+            printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
+            printf("DATA=%.*s\r\n", event->data_len, event->data);
+            break;
+
+        case MQTT_EVENT_ERROR:
+            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
+            break;
+
+	case MQTT_EVENT_BEFORE_CONNECT:
+	    ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
+	    // Configure connection can be here.
+	    break;
+
+        default:
+            ESP_LOGI(TAG, "Other event id:%d", event->event_id);
+            break;
+    }
+    return ESP_OK;
+}
+
+
+
+static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
+    ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
+    mqtt_event_handler_cb(event_data);
+}
+
+
+
+
+/*
+ * Task to read temperature sensors on request.
+ */
+void task_mqtt(void *pvParameter)
+{
+    esp_err_t	err;
+
+    ESP_LOGI(TAG, "Starting MQTT task");
+
+    /*
+     * Initialize Sequence counter from NVS
+     */
+    err = nvs_open("storage", NVS_READWRITE, &seq_handle);
+    if (err != ESP_OK) {
+        ESP_LOGI(TAG, "Error (%s) opening NVS handle", esp_err_to_name(err));
+    } else {
+	err = nvs_get_u64(seq_handle, "Sequence_cnt", &Sequence);
+	switch (err) {
+	    case ESP_OK:
+		ESP_LOGI(TAG, "Sequence counter from NVS = %lld", Sequence);
+		break;
+
+	    case ESP_ERR_NVS_NOT_FOUND:
+		ESP_LOGI(TAG, "Sequence counter not found");
+		break;
+
+	    default:
+		ESP_LOGI(TAG, "Error (%s) init Sequence", esp_err_to_name(err));
+		break;
+	}
+    }
+
+    /* event handler and event group for the wifi driver */
+    xEventGroupMQTT = xEventGroupCreate();
+    EventBits_t uxBits;
+    esp_mqtt_client_config_t mqtt_cfg = {
+        .uri = "mqtt://seaport.mbse.ym",
+    };
+    /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg);
+    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
+
+    /*
+     * Task loop forever.
+     */
+    while (1) {
+
+	uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );
+
+	if (uxBits & TASK_MQTT_CONNECT) {
+	    ESP_LOGI(TAG, "Request MQTT connect");
+	    err = esp_mqtt_client_start(client);
+	    ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
+	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
+
+	} else if (uxBits & TASK_MQTT_DISCONNECT) {
+	    ESP_LOGI(TAG, "Request MQTT disconnect");
+	    // publish disconnect messages
+	    err = esp_mqtt_client_stop(client);
+	    ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
+
+	    err = nvs_commit(seq_handle);
+	    if (err != ESP_OK)
+		ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err));
+	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
+	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
+	}
+	vTaskDelay( (TickType_t)10);
+    }
+}
+

mercurial