main/task_mqtt.c

changeset 5
b1f38105ca7e
child 6
bad3414f7bc4
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/main/task_mqtt.c	Thu Mar 30 17:05:05 2023 +0200
@@ -0,0 +1,313 @@
+/**
+ * @file task_mqtt.c
+ * @brief The FreeRTOS task to maintain MQTT connections.
+ */
+
+
+#include "config.h"
+
+
+static const char		*TAG = "task_mqtt";
+
+EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task
+SemaphoreHandle_t		xSemaphorePcounter;	///< Publish counter semaphore.
+int				count_pub = 0;		///< Outstanding published messages.
+esp_mqtt_client_handle_t	client;			///< MQTT client handle
+
+const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
+const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
+const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected
+
+const char			*sensState[] = { "OK", "ERROR" };	///< Sensor state strings
+const char			*unitMode[] = { "OFF", "ON" };		///< Units state strings
+
+extern BMP280_State		*bmp280_state;		///< BMP280 state
+extern SemaphoreHandle_t        xSemaphoreBMP280;	///< BMP280 lock semaphore
+extern INA219_State		*ina219_state;		///< INA219 state
+extern SemaphoreHandle_t	xSemaphoreINA219;	///< INA219 lock semaphore
+extern WIFI_State		*wifi_state;		///< WiFi state
+extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore
+extern const esp_app_desc_t	*app_desc;
+
+extern uint32_t			Alarm;
+extern float			batteryState;
+extern float			batteryVolts;
+extern float			batteryCurrent;
+extern float			batteryPower;
+extern float			solarVolts;
+extern float			solarCurrent;
+extern float			solarPower;
+
+
+void connect_mqtt(bool state)
+{
+    if (state)
+    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
+    else
+    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
+}
+
+
+
+bool ready_mqtt(void)
+{
+    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
+        return true;
+    return false;
+}
+
+
+
+/**
+ * @brief Generate the mqtt payload header.
+ * @return Allocated character string with the header.
+ */
+char *payload_header(void)
+{
+    char        *tmp;
+
+    tmp = xstrcpy((char *)"{\"metric\":");
+    return tmp;
+}
+
+
+
+/**
+ * @brief Generate the mqtt topic base part.
+ * @return The topic string allocated in memory.
+ */
+char *topic_base(void)
+{
+    char        *tmp;
+
+#ifdef CONFIG_CODE_PRODUCTION
+    tmp = xstrcpy((char *)"balkon/");
+#endif
+#ifdef CONFIG_CODE_TESTING
+    tmp = xstrcpy((char *)"wemos/");
+#endif
+
+    return tmp;
+}
+
+
+
+/**
+ * @brief The mqtt generic publish function.
+ * @param topic The topic of the mqtt message.
+ * @param payload The payload of the mqtt message.
+ */
+void publisher(char *topic, char *payload)
+{
+    /*
+     * First count, then sent the data.
+     */
+    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
+        count_pub++;
+        xSemaphoreGive(xSemaphorePcounter);
+    } else {
+        ESP_LOGE(TAG, "publisher() counter lock");
+    }
+
+    if (payload)
+        esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
+    else
+	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
+}
+
+
+
+void publish(void)
+{
+    char        *topic = NULL, *payload = NULL, buf[64];
+
+    // {"system":{"battery":70,"alarm":0,"version":"0.2.6","rssi":-56,"wifi":88,"light":{"lux":12.34,"gain":2}},"solar":{"voltage":13.98,"current":234.1,"power":3.272718},"battery":{"voltage":13.21,"current":4.942289,"power":0.065288},"real":{"current":229.1577},"TH":{"temperature":20.2,"humidity":48.3},"output":{"relay1":0,"relay2":0,"dimmer3":0,"dimmer4":0}}
+    //
+    payload = payload_header();
+    payload = xstrcat(payload, (char *)"{\"system\":{\"battery\":");
+    sprintf(buf, "%.0f", batteryState);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"alarm\":");
+    sprintf(buf, "%ld", Alarm);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"version\":\"");
+    payload = xstrcat(payload, (char *)app_desc->version);
+    payload = xstrcat(payload, (char *)",\"rssi\":");
+
+    payload = xstrcat(payload, (char *)",\"light\":{\"lux\":");
+
+    payload = xstrcat(payload, (char *)",\"gain\":");
+
+    payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":");
+    sprintf(buf, "%.2f", solarVolts);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"current\":");
+    sprintf(buf, "%.1f", solarCurrent);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"power\":");
+    sprintf(buf, "%.3f", solarPower / 1000.0);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":");
+    sprintf(buf, "%.2f", batteryVolts);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"current\":");
+    sprintf(buf, "%.1f", batteryCurrent);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)",\"power\":");
+    sprintf(buf, "%.3f", batteryPower / 1000.0);
+    payload = xstrcat(payload, buf);
+    payload = xstrcat(payload, (char *)"},\"real\":{\"current\":");
+    payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":");
+    if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) {
+	sprintf(buf, "%.2f", bmp280_state->temperature);
+	payload = xstrcat(payload, buf);
+	payload = xstrcat(payload, (char *)",\"pressure\":");
+	sprintf(buf, "%.1f", bmp280_state->pressure / 100.0);
+	payload = xstrcat(payload, buf);
+	xSemaphoreGive(xSemaphoreBMP280);
+    }
+    payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":");
+
+    payload = xstrcat(payload, (char *)",\"relay2\":");
+
+    payload = xstrcat(payload, (char *)",\"dimmer3\":");
+
+    payload = xstrcat(payload, (char *)",\"dimmer4\":");
+
+    payload = xstrcat(payload, (char *)"}}");
+    topic = topic_base();
+    topic = xstrcat(topic, (char *)"status");
+    publisher(topic, payload);
+    free(topic);
+    topic = NULL;
+    free(payload);
+    payload = NULL;
+}
+
+
+
+static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
+{
+    switch (event->event_id) {
+
+        case MQTT_EVENT_CONNECTED:
+            ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
+	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
+            break;
+
+        case MQTT_EVENT_DISCONNECTED:
+            ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
+	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
+            break;
+
+        case MQTT_EVENT_SUBSCRIBED:
+            ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
+            break;
+
+        case MQTT_EVENT_UNSUBSCRIBED:
+            ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
+            break;
+
+        case MQTT_EVENT_PUBLISHED:
+            ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
+	    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
+	    	if (count_pub) {
+		    count_pub--;
+	    	}
+		xSemaphoreGive(xSemaphorePcounter);
+	    } else {
+        	ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event");
+	    }
+            break;
+
+        case MQTT_EVENT_DATA:
+            ESP_LOGI(TAG, "MQTT_EVENT_DATA");
+            printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
+            printf("DATA=%.*s\r\n", event->data_len, event->data);
+            break;
+
+        case MQTT_EVENT_ERROR:
+            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
+            break;
+
+	case MQTT_EVENT_BEFORE_CONNECT:
+	    ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
+	    // Configure connection can be here.
+	    break;
+
+        default:
+            ESP_LOGI(TAG, "Other event id:%d", event->event_id);
+            break;
+    }
+    return ESP_OK;
+}
+
+
+
+static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
+    mqtt_event_handler_cb(event_data);
+}
+
+
+
+/*
+ * Task to read temperature sensors on request.
+ */
+void task_mqtt(void *pvParameter)
+{
+    esp_err_t	err;
+    char	*uri = NULL, port[11];
+
+    ESP_LOGI(TAG, "Starting MQTT task");
+    xSemaphorePcounter = xSemaphoreCreateMutex();
+
+    /* event handler and event group for the wifi driver */
+    xEventGroupMQTT = xEventGroupCreate();
+    EventBits_t uxBits;
+
+    esp_mqtt_client_config_t mqtt_cfg = {
+        .broker.address.uri = "mqtt://localhost",
+    };
+    client = esp_mqtt_client_init(&mqtt_cfg);
+    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
+
+    /*
+     * Task loop forever.
+     */
+    while (1) {
+
+	uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );
+
+	if (uxBits & TASK_MQTT_CONNECT) {
+	    uri = xstrcpy((char *)"mqtt://");
+	    if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) {
+		uri = xstrcat(uri, CONFIG_MQTT_USER);
+		uri = xstrcat(uri, (char *)":");
+		uri = xstrcat(uri, CONFIG_MQTT_PASS);
+		uri = xstrcat(uri, (char *)"@");
+	    }
+	    uri = xstrcat(uri, CONFIG_MQTT_SERVER);
+	    if (CONFIG_MQTT_PORT != 1883) {
+		uri = xstrcat(uri, (char *)":");
+		sprintf(port, "%d", CONFIG_MQTT_PORT);
+		uri = xstrcat(uri, port);
+	    }
+
+	    ESP_LOGI(TAG, "Request MQTT connect %s", uri);
+	    err = esp_mqtt_client_set_uri(client, uri);
+            if (err != ESP_OK)
+                ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
+	    err = esp_mqtt_client_start(client);
+	    if (err != ESP_OK)
+	    	ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
+	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
+
+	} else if (uxBits & TASK_MQTT_DISCONNECT) {
+	    ESP_LOGI(TAG, "Request MQTT disconnect");
+	    esp_mqtt_client_stop(client);
+	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
+	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
+	}
+    }
+}
+

mercurial