diff -r d0155c16e992 -r b1f38105ca7e main/task_mqtt.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/main/task_mqtt.c Thu Mar 30 17:05:05 2023 +0200 @@ -0,0 +1,313 @@ +/** + * @file task_mqtt.c + * @brief The FreeRTOS task to maintain MQTT connections. + */ + + +#include "config.h" + + +static const char *TAG = "task_mqtt"; + +EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task +SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. +int count_pub = 0; ///< Outstanding published messages. +esp_mqtt_client_handle_t client; ///< MQTT client handle + +const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection +const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect +const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected + +const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings +const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings + +extern BMP280_State *bmp280_state; ///< BMP280 state +extern SemaphoreHandle_t xSemaphoreBMP280; ///< BMP280 lock semaphore +extern INA219_State *ina219_state; ///< INA219 state +extern SemaphoreHandle_t xSemaphoreINA219; ///< INA219 lock semaphore +extern WIFI_State *wifi_state; ///< WiFi state +extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore +extern const esp_app_desc_t *app_desc; + +extern uint32_t Alarm; +extern float batteryState; +extern float batteryVolts; +extern float batteryCurrent; +extern float batteryPower; +extern float solarVolts; +extern float solarCurrent; +extern float solarPower; + + +void connect_mqtt(bool state) +{ + if (state) + xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT); + else + xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); +} + + + +bool ready_mqtt(void) +{ + if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) + return true; + return false; +} + + + +/** + * @brief Generate the mqtt payload header. + * @return Allocated character string with the header. + */ +char *payload_header(void) +{ + char *tmp; + + tmp = xstrcpy((char *)"{\"metric\":"); + return tmp; +} + + + +/** + * @brief Generate the mqtt topic base part. + * @return The topic string allocated in memory. + */ +char *topic_base(void) +{ + char *tmp; + +#ifdef CONFIG_CODE_PRODUCTION + tmp = xstrcpy((char *)"balkon/"); +#endif +#ifdef CONFIG_CODE_TESTING + tmp = xstrcpy((char *)"wemos/"); +#endif + + return tmp; +} + + + +/** + * @brief The mqtt generic publish function. + * @param topic The topic of the mqtt message. + * @param payload The payload of the mqtt message. + */ +void publisher(char *topic, char *payload) +{ + /* + * First count, then sent the data. + */ + if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { + count_pub++; + xSemaphoreGive(xSemaphorePcounter); + } else { + ESP_LOGE(TAG, "publisher() counter lock"); + } + + if (payload) + esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); + else + esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); +} + + + +void publish(void) +{ + char *topic = NULL, *payload = NULL, buf[64]; + + // {"system":{"battery":70,"alarm":0,"version":"0.2.6","rssi":-56,"wifi":88,"light":{"lux":12.34,"gain":2}},"solar":{"voltage":13.98,"current":234.1,"power":3.272718},"battery":{"voltage":13.21,"current":4.942289,"power":0.065288},"real":{"current":229.1577},"TH":{"temperature":20.2,"humidity":48.3},"output":{"relay1":0,"relay2":0,"dimmer3":0,"dimmer4":0}} + // + payload = payload_header(); + payload = xstrcat(payload, (char *)"{\"system\":{\"battery\":"); + sprintf(buf, "%.0f", batteryState); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"alarm\":"); + sprintf(buf, "%ld", Alarm); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"version\":\""); + payload = xstrcat(payload, (char *)app_desc->version); + payload = xstrcat(payload, (char *)",\"rssi\":"); + + payload = xstrcat(payload, (char *)",\"light\":{\"lux\":"); + + payload = xstrcat(payload, (char *)",\"gain\":"); + + payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":"); + sprintf(buf, "%.2f", solarVolts); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"current\":"); + sprintf(buf, "%.1f", solarCurrent); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"power\":"); + sprintf(buf, "%.3f", solarPower / 1000.0); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":"); + sprintf(buf, "%.2f", batteryVolts); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"current\":"); + sprintf(buf, "%.1f", batteryCurrent); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"power\":"); + sprintf(buf, "%.3f", batteryPower / 1000.0); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"},\"real\":{\"current\":"); + payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":"); + if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) { + sprintf(buf, "%.2f", bmp280_state->temperature); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"pressure\":"); + sprintf(buf, "%.1f", bmp280_state->pressure / 100.0); + payload = xstrcat(payload, buf); + xSemaphoreGive(xSemaphoreBMP280); + } + payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":"); + + payload = xstrcat(payload, (char *)",\"relay2\":"); + + payload = xstrcat(payload, (char *)",\"dimmer3\":"); + + payload = xstrcat(payload, (char *)",\"dimmer4\":"); + + payload = xstrcat(payload, (char *)"}}"); + topic = topic_base(); + topic = xstrcat(topic, (char *)"status"); + publisher(topic, payload); + free(topic); + topic = NULL; + free(payload); + payload = NULL; +} + + + +static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) +{ + switch (event->event_id) { + + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); + break; + + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); + xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); + break; + + case MQTT_EVENT_SUBSCRIBED: + ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); + break; + + case MQTT_EVENT_UNSUBSCRIBED: + ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); + break; + + case MQTT_EVENT_PUBLISHED: + ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); + if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { + if (count_pub) { + count_pub--; + } + xSemaphoreGive(xSemaphorePcounter); + } else { + ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event"); + } + break; + + case MQTT_EVENT_DATA: + ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); + break; + + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + break; + + case MQTT_EVENT_BEFORE_CONNECT: + ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT"); + // Configure connection can be here. + break; + + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; + } + return ESP_OK; +} + + + +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { + mqtt_event_handler_cb(event_data); +} + + + +/* + * Task to read temperature sensors on request. + */ +void task_mqtt(void *pvParameter) +{ + esp_err_t err; + char *uri = NULL, port[11]; + + ESP_LOGI(TAG, "Starting MQTT task"); + xSemaphorePcounter = xSemaphoreCreateMutex(); + + /* event handler and event group for the wifi driver */ + xEventGroupMQTT = xEventGroupCreate(); + EventBits_t uxBits; + + esp_mqtt_client_config_t mqtt_cfg = { + .broker.address.uri = "mqtt://localhost", + }; + client = esp_mqtt_client_init(&mqtt_cfg); + esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); + + /* + * Task loop forever. + */ + while (1) { + + uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); + + if (uxBits & TASK_MQTT_CONNECT) { + uri = xstrcpy((char *)"mqtt://"); + if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) { + uri = xstrcat(uri, CONFIG_MQTT_USER); + uri = xstrcat(uri, (char *)":"); + uri = xstrcat(uri, CONFIG_MQTT_PASS); + uri = xstrcat(uri, (char *)"@"); + } + uri = xstrcat(uri, CONFIG_MQTT_SERVER); + if (CONFIG_MQTT_PORT != 1883) { + uri = xstrcat(uri, (char *)":"); + sprintf(port, "%d", CONFIG_MQTT_PORT); + uri = xstrcat(uri, port); + } + + ESP_LOGI(TAG, "Request MQTT connect %s", uri); + err = esp_mqtt_client_set_uri(client, uri); + if (err != ESP_OK) + ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err)); + err = esp_mqtt_client_start(client); + if (err != ESP_OK) + ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); + xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); + + } else if (uxBits & TASK_MQTT_DISCONNECT) { + ESP_LOGI(TAG, "Request MQTT disconnect"); + esp_mqtt_client_stop(client); + xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); + xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); + } + } +} +