Thu, 13 Apr 2023 12:08:24 +0200
Version 0.4.0. Disable normal logging mqtt.
/** * @file task_mqtt.c * @brief The FreeRTOS task to maintain MQTT connections. */ #include "config.h" static const char *TAG = "task_mqtt"; EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. int count_pub = 0; ///< Outstanding published messages. esp_mqtt_client_handle_t client; ///< MQTT client handle const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected const int TASK_MQTT_DISCONNECTED = BIT3; const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings extern BMP280_State *bmp280_state; ///< BMP280 state extern SemaphoreHandle_t xSemaphoreBMP280; ///< BMP280 lock semaphore extern INA219_State *ina219_state; ///< INA219 state extern SemaphoreHandle_t xSemaphoreINA219; ///< INA219 lock semaphore extern APDS9930_State *apds9930_state; ///< APDS9930 state extern SemaphoreHandle_t xSemaphoreAPDS9930; ///< APDS9930 lock semaphore extern WIFI_State *wifi_state; ///< WiFi state extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore extern const esp_app_desc_t *app_desc; extern uint32_t Alarm; extern int batteryState; extern float batteryVolts; extern float batteryCurrent; extern float batteryPower; extern float solarVolts; extern float solarCurrent; extern float solarPower; extern uint8_t Relay1; extern uint8_t Relay2; extern uint8_t Dimmer3; extern uint8_t Dimmer4; void request_mqtt(bool state) { if (state) { if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) { ESP_LOGW(TAG, "request_mqtt(true) but already connected"); } else { xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT); } } else { if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_DISCONNECT) { ESP_LOGW(TAG, "request_mqtt(false) already in progress"); } else if (! (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)) { ESP_LOGW(TAG, "request_mqtt(false) but already disconnected"); } else { xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); } } } bool ready_mqtt(void) { if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) return true; return false; } void wait_mqtt(int time) { // EventBits_t uxBits; if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) { /*uxBits =*/ xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, time / portTICK_PERIOD_MS); // ESP_LOGI(TAG, "wait_mqtt(%d) 2 %lu", time, uxBits & TASK_MQTT_DISCONNECTED); // } else { // ESP_LOGI(TAG, "wait_mqtt(%d) 3 not connected", time); } } /** * @brief Generate the mqtt topic base part. * @return The topic string allocated in memory. */ char *topic_base(void) { char *tmp; #ifdef CONFIG_CODE_PRODUCTION tmp = xstrcpy((char *)"balkon/"); #endif #ifdef CONFIG_CODE_TESTING tmp = xstrcpy((char *)"wemos/"); #endif return tmp; } /** * @brief The mqtt generic publish function. * @param topic The topic of the mqtt message. * @param payload The payload of the mqtt message. */ void publisher(char *topic, char *payload) { /* * First count, then sent the data. */ if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { count_pub++; xSemaphoreGive(xSemaphorePcounter); } else { ESP_LOGE(TAG, "publisher() counter lock"); } if (payload) esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); else esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); } bool do_event_data(char *check, char *topic, char *data) { return false; } void publish(void) { char *topic = NULL, *payload = NULL, buf[64]; const esp_app_desc_t *app_desc = esp_app_get_description(); int Quality = 0; // {"system":{"battery":40,"alarm":0,"version":"0.3.1","rssi":-77,"wifi":46,"light":{"lux":0.1,"gain":3,"agl":0}},"solar":{"voltage":0.31,"current":0,"power":0},"battery":{"voltage":12.27,"current":53.5,"power":0.657},"real":{"current":-53.5},"TH":{"temperature":8.88,"humidity":0},"output":{"relay1":0,"relay2":0,"dimmer3":90,"dimmer4":80}} payload = xstrcpy((char *)"{\"system\":{\"battery\":"); sprintf(buf, "%d", batteryState); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"alarm\":"); sprintf(buf, "%ld", Alarm); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"version\":\""); payload = xstrcat(payload, (char *)app_desc->version); payload = xstrcat(payload, (char *)"\",\"rssi\":"); if (xSemaphoreTake(xSemaphoreWiFi, 25) == pdTRUE) { sprintf(buf, "%d", wifi_state->STA_rssi); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"wifi\":"); if (wifi_state->STA_rssi <= -100) { Quality = 0; } else if (wifi_state->STA_rssi >= -50) { Quality = 100; } else { Quality = 2 * (wifi_state->STA_rssi + 100); } sprintf(buf, "%d", Quality); payload = xstrcat(payload, buf); xSemaphoreGive(xSemaphoreWiFi); } payload = xstrcat(payload, (char *)",\"light\":{\"lux\":"); if (xSemaphoreTake(xSemaphoreAPDS9930, 25) == pdTRUE) { sprintf(buf, "%.1f", apds9930_state->ambient_light); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"gain\":"); sprintf(buf, "%d", apds9930_state->gain); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"agl\":"); sprintf(buf, "%d", apds9930_state->aglbit); payload = xstrcat(payload, buf); xSemaphoreGive(xSemaphoreAPDS9930); } payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":"); sprintf(buf, "%.2f", solarVolts); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"current\":"); sprintf(buf, "%.1f", solarCurrent); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"power\":"); sprintf(buf, "%.3f", solarPower / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":"); sprintf(buf, "%.2f", batteryVolts); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"current\":"); sprintf(buf, "%.1f", batteryCurrent); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"power\":"); sprintf(buf, "%.3f", batteryPower / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"real\":{\"current\":"); sprintf(buf, "%.1f", (0 - batteryCurrent) + solarCurrent); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":"); if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) { sprintf(buf, "%.2f", bmp280_state->temperature); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"pressure\":"); sprintf(buf, "%.1f", bmp280_state->pressure / 100.0); payload = xstrcat(payload, buf); xSemaphoreGive(xSemaphoreBMP280); } payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":"); sprintf(buf, "%d", Relay1); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"relay2\":"); sprintf(buf, "%d", Relay2); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"dimmer3\":"); sprintf(buf, "%d", Dimmer3); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"dimmer4\":"); sprintf(buf, "%d", Dimmer4); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}}"); topic = topic_base(); topic = xstrcat(topic, (char *)"status"); ESP_LOGI(TAG, "%s %s", topic, payload); publisher(topic, payload); free(topic); topic = NULL; free(payload); payload = NULL; } static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) { char *subscr = NULL, *check = NULL; switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); subscr = topic_base(); subscr = xstrcat(subscr, (char *)"output/set/#"); int msgid = esp_mqtt_client_subscribe(client, subscr, 0); ESP_LOGD(TAG, "Subscribe `%s' id %d", subscr, msgid); free(subscr); subscr = NULL; xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { if (count_pub) { count_pub--; } xSemaphoreGive(xSemaphorePcounter); } else { ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event"); } ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d, %d msgs in queue", event->msg_id, count_pub); break; case MQTT_EVENT_DATA: char data[65], topic[128]; if (event->data_len < 65) snprintf(data, 64, "%.*s", event->data_len, event->data); else data[0] = '\0'; if (event->topic_len < 128) snprintf(topic, 127, "%.*s", event->topic_len, event->topic); else topic[0] = '\0'; ESP_LOGI(TAG, "MQTT_EVENT_DATA %s %s", topic, data); check = topic_base(); check = xstrcat(check, (char *)"output/set/1"); if (strncmp(check, event->topic, event->topic_len) == 0) { ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data)); Relay1 = (uint8_t)atoi(data); nvsio_write_u8((char *)"out1", Relay1); } check[strlen(check)-1] = '2'; if (strncmp(check, event->topic, event->topic_len) == 0) { ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data)); Relay2 = (uint8_t)atoi(data); nvsio_write_u8((char *)"out2", Relay2); } check[strlen(check)-1] = '3'; if (strncmp(check, event->topic, event->topic_len) == 0) { ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data)); Dimmer3 = (uint8_t)atoi(data); nvsio_write_u8((char *)"out3", Dimmer3); } check[strlen(check)-1] = '4'; if (strncmp(check, event->topic, event->topic_len) == 0) { ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data)); Dimmer4 = (uint8_t)atoi(data); nvsio_write_u8((char *)"out4", Dimmer4); } free(check); check = NULL; break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; case MQTT_EVENT_BEFORE_CONNECT: ESP_LOGD(TAG, "MQTT_EVENT_BEFORE_CONNECT"); // Configure connection can be here. break; default: ESP_LOGE(TAG, "Other event id:%d", event->event_id); break; } return ESP_OK; } static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { mqtt_event_handler_cb(event_data); } /* * Task to read temperature sensors on request. */ void task_mqtt(void *pvParameter) { esp_err_t err; char *uri = NULL, port[11]; ESP_LOGI(TAG, "Starting MQTT task"); esp_log_level_set("MQTT_CLIENT", ESP_LOG_ERROR); xSemaphorePcounter = xSemaphoreCreateMutex(); /* event handler and event group for the wifi driver */ xEventGroupMQTT = xEventGroupCreate(); EventBits_t uxBits; xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = "mqtt://localhost", }; client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); /* * Task loop forever. */ while (1) { uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); if (uxBits & TASK_MQTT_CONNECT) { /* * First build the connect uri. */ uri = xstrcpy((char *)"mqtt://"); if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) { uri = xstrcat(uri, CONFIG_MQTT_USER); uri = xstrcat(uri, (char *)":"); uri = xstrcat(uri, CONFIG_MQTT_PASS); uri = xstrcat(uri, (char *)"@"); } uri = xstrcat(uri, CONFIG_MQTT_SERVER); if (CONFIG_MQTT_PORT != 1883) { uri = xstrcat(uri, (char *)":"); sprintf(port, "%d", CONFIG_MQTT_PORT); uri = xstrcat(uri, port); } ESP_LOGI(TAG, "Request MQTT connect %s", uri); /* * Connect to the broker. */ err = esp_mqtt_client_set_uri(client, uri); if (err != ESP_OK) ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err)); err = esp_mqtt_client_start(client); if (err != ESP_OK) { ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); } xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); } else if (uxBits & TASK_MQTT_DISCONNECT) { ESP_LOGI(TAG, "Request MQTT disconnect"); /* * Unsubscribe if connected */ if (ready_mqtt()) { char *topic = topic_base(); topic = xstrcat(topic, (char *)"output/set/#"); esp_mqtt_client_unsubscribe(client, topic); free(topic); } /* * Disconnect from broker and wait until confirmed. */ err = esp_mqtt_client_disconnect(client); if (err != ESP_OK) { ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); } xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, 500 / portTICK_PERIOD_MS); /* * Finally stop the client because new connections start * with a 'esp_mqtt_client_start()' command. * This will take about 5 seconds, but we don't need the network. */ err = esp_mqtt_client_stop(client); if (err != ESP_OK) { ESP_LOGE(TAG, "esp_mqtt_client_stop() result %s", esp_err_to_name(err)); } xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); ESP_LOGI(TAG, "Request MQTT disconnect done"); } } }