Fri, 31 Mar 2023 20:31:12 +0200
Added volts/current loop calculations. Added millis() timer running on the hardware clock. Completed most of the main state loop. Added MQTT wait for disconnect. MQTT disconnect in two passes, disconnect and stop.
/** * @file task_mqtt.c * @brief The FreeRTOS task to maintain MQTT connections. */ #include "config.h" static const char *TAG = "task_mqtt"; EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. int count_pub = 0; ///< Outstanding published messages. esp_mqtt_client_handle_t client; ///< MQTT client handle const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected const int TASK_MQTT_DISCONNECTED = BIT3; const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings extern BMP280_State *bmp280_state; ///< BMP280 state extern SemaphoreHandle_t xSemaphoreBMP280; ///< BMP280 lock semaphore extern INA219_State *ina219_state; ///< INA219 state extern SemaphoreHandle_t xSemaphoreINA219; ///< INA219 lock semaphore extern WIFI_State *wifi_state; ///< WiFi state extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore extern const esp_app_desc_t *app_desc; extern uint32_t Alarm; extern float batteryState; extern float batteryVolts; extern float batteryCurrent; extern float batteryPower; extern float solarVolts; extern float solarCurrent; extern float solarPower; void connect_mqtt(bool state) { if (state) xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT); else xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); } bool ready_mqtt(void) { if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) return true; return false; } void wait_mqtt(int time) { EventBits_t uxBits; ESP_LOGI(TAG, "wait_mqtt(%d) 1", time); if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) { uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, time / portTICK_PERIOD_MS); ESP_LOGI(TAG, "wait_mqtt(%d) 2 %lu", time, uxBits & TASK_MQTT_DISCONNECTED); } else { ESP_LOGI(TAG, "wait_mqtt(%d) 3 not connected", time); } } /** * @brief Generate the mqtt payload header. * @return Allocated character string with the header. */ char *payload_header(void) { char *tmp; tmp = xstrcpy((char *)"{\"metric\":"); return tmp; } /** * @brief Generate the mqtt topic base part. * @return The topic string allocated in memory. */ char *topic_base(void) { char *tmp; #ifdef CONFIG_CODE_PRODUCTION tmp = xstrcpy((char *)"balkon/"); #endif #ifdef CONFIG_CODE_TESTING tmp = xstrcpy((char *)"wemos/"); #endif return tmp; } /** * @brief The mqtt generic publish function. * @param topic The topic of the mqtt message. * @param payload The payload of the mqtt message. */ void publisher(char *topic, char *payload) { /* * First count, then sent the data. */ if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { count_pub++; xSemaphoreGive(xSemaphorePcounter); } else { ESP_LOGE(TAG, "publisher() counter lock"); } if (payload) esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); else esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); } void publish(void) { char *topic = NULL, *payload = NULL, buf[64]; const esp_app_desc_t *app_desc = esp_app_get_description(); // {"system":{"battery":70,"alarm":0,"version":"0.2.6","rssi":-56,"wifi":88,"light":{"lux":12.34,"gain":2}},"solar":{"voltage":13.98,"current":234.1,"power":3.272718},"battery":{"voltage":13.21,"current":4.942289,"power":0.065288},"real":{"current":229.1577},"TH":{"temperature":20.2,"humidity":48.3},"output":{"relay1":0,"relay2":0,"dimmer3":0,"dimmer4":0}} // payload = payload_header(); payload = xstrcat(payload, (char *)"{\"system\":{\"battery\":"); sprintf(buf, "%.0f", batteryState); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"alarm\":"); sprintf(buf, "%ld", Alarm); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"version\":\""); payload = xstrcat(payload, (char *)app_desc->version); payload = xstrcat(payload, (char *)"\",\"rssi\":"); if (xSemaphoreTake(xSemaphoreWiFi, 25) == pdTRUE) { sprintf(buf, "%d", wifi_state->STA_rssi); payload = xstrcat(payload, buf); xSemaphoreGive(xSemaphoreWiFi); } payload = xstrcat(payload, (char *)",\"light\":{\"lux\":"); payload = xstrcat(payload, (char *)",\"gain\":"); payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":"); sprintf(buf, "%.2f", solarVolts); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"current\":"); sprintf(buf, "%.1f", solarCurrent); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"power\":"); sprintf(buf, "%.3f", solarPower / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":"); sprintf(buf, "%.2f", batteryVolts); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"current\":"); sprintf(buf, "%.1f", batteryCurrent); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"power\":"); sprintf(buf, "%.3f", batteryPower / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"real\":{\"current\":"); sprintf(buf, "%.1f", (0 - batteryCurrent) + solarCurrent); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":"); if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) { sprintf(buf, "%.2f", bmp280_state->temperature); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"pressure\":"); sprintf(buf, "%.1f", bmp280_state->pressure / 100.0); payload = xstrcat(payload, buf); xSemaphoreGive(xSemaphoreBMP280); } payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":"); payload = xstrcat(payload, (char *)",\"relay2\":"); payload = xstrcat(payload, (char *)",\"dimmer3\":"); payload = xstrcat(payload, (char *)",\"dimmer4\":"); payload = xstrcat(payload, (char *)"}}"); topic = topic_base(); topic = xstrcat(topic, (char *)"status"); ESP_LOGI(TAG, "%s %s", topic, payload); // publisher(topic, payload); free(topic); topic = NULL; free(payload); payload = NULL; } static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) { switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); char *topic = topic_base(); topic = xstrcat(topic, (char *)"output/set/#"); ESP_LOGI(TAG, "Subscribe `%s' id %d", topic, esp_mqtt_client_subscribe(client, topic, 0)); free(topic); xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { if (count_pub) { count_pub--; } xSemaphoreGive(xSemaphorePcounter); } else { ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event"); } break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); printf("TOPIC=%.*s ", event->topic_len, event->topic); printf("DATA=%.*s\r\n", event->data_len, event->data); break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; case MQTT_EVENT_BEFORE_CONNECT: ESP_LOGD(TAG, "MQTT_EVENT_BEFORE_CONNECT"); // Configure connection can be here. break; default: ESP_LOGE(TAG, "Other event id:%d", event->event_id); break; } return ESP_OK; } static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { mqtt_event_handler_cb(event_data); } /* * Task to read temperature sensors on request. */ void task_mqtt(void *pvParameter) { esp_err_t err; char *uri = NULL, port[11]; ESP_LOGI(TAG, "Starting MQTT task"); xSemaphorePcounter = xSemaphoreCreateMutex(); /* event handler and event group for the wifi driver */ xEventGroupMQTT = xEventGroupCreate(); EventBits_t uxBits; xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = "mqtt://localhost", }; client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); /* * Task loop forever. */ while (1) { uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); if (uxBits & TASK_MQTT_CONNECT) { /* * First build the connect uri. */ uri = xstrcpy((char *)"mqtt://"); if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) { uri = xstrcat(uri, CONFIG_MQTT_USER); uri = xstrcat(uri, (char *)":"); uri = xstrcat(uri, CONFIG_MQTT_PASS); uri = xstrcat(uri, (char *)"@"); } uri = xstrcat(uri, CONFIG_MQTT_SERVER); if (CONFIG_MQTT_PORT != 1883) { uri = xstrcat(uri, (char *)":"); sprintf(port, "%d", CONFIG_MQTT_PORT); uri = xstrcat(uri, port); } ESP_LOGI(TAG, "Request MQTT connect %s", uri); /* * Connect to the broker. */ err = esp_mqtt_client_set_uri(client, uri); if (err != ESP_OK) ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err)); err = esp_mqtt_client_start(client); if (err != ESP_OK) { ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); } xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); } else if (uxBits & TASK_MQTT_DISCONNECT) { ESP_LOGI(TAG, "Request MQTT disconnect"); /* * Unsubscribe if connected */ if (ready_mqtt()) { char *topic = topic_base(); topic = xstrcat(topic, (char *)"output/set/#"); esp_mqtt_client_unsubscribe(client, topic); free(topic); } /* * Disconnect from broker and wait until confirmed. */ err = esp_mqtt_client_disconnect(client); if (err != ESP_OK) { ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); } xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, 500 / portTICK_PERIOD_MS); ESP_LOGI(TAG, "disconnect confirmed"); /* * Finally stop the client because new connections start * with a 'esp_mqtt_client_start()' command. * This will take about 5 seconds, but we don't need the network. */ err = esp_mqtt_client_stop(client); if (err != ESP_OK) { ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); } else { ESP_LOGI(TAG, "stopped"); } xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); } } }