Sun, 17 Nov 2019 12:58:39 +0100
Temporary white splash screen. Select DS18B20 sensor per unit. Changed units result logging
/** * @file task_mqtt.c * @brief The FreeRTOS task to maintain MQTT connections. */ #include "config.h" static const char *TAG = "task_mqtt"; EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. uint64_t Sequence = 0; ///< Sequence stored in NVS nvs_handle_t seq_handle; ///< NVS handle int count_pub = 0; ///< Outstanding published messages. esp_mqtt_client_handle_t client; ///< MQTT client handle const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected const char *sensState[] = { "OK", "ERROR" }; const char *unitMode[] = { "OFF", "ON" }; extern DS18B20_State *ds18b20_state; ///< DS18B20 state extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore extern ADC_State *adc_state; ///< ADC state extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore extern WIFI_State *wifi_state; ///< WiFi state extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore extern unit_t units[3]; extern SemaphoreHandle_t xSemaphoreUnits; extern const esp_app_desc_t *app_desc; void connect_mqtt(bool state) { if (state) xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT); else xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); } bool ready_mqtt(void) { if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) return true; return false; } char *payload_header(void) { char *tmp, buf[128]; esp_err_t err; tmp = xstrcpy((char *)"{\"seq\":"); sprintf(buf, "%lld", Sequence++); err = nvs_set_u64(seq_handle, "Sequence_cnt", Sequence); if (err != ESP_OK) ESP_LOGE(TAG, "Error %s write Sequence to NVS", esp_err_to_name(err)); tmp = xstrcat(tmp, buf); tmp = xstrcat(tmp, (char *)",\"metric\":"); return tmp; } char *topic_base(char *msgtype) { char *tmp; tmp = xstrcpy((char *)"mbv1.0/co2meters/"); tmp = xstrcat(tmp, msgtype); tmp = xstrcat(tmp, (char *)"/"); tmp = xstrcat(tmp, config.hostname); return tmp; } void publisher(char *topic, char *payload) { /* * First count, then sent the data. */ if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { count_pub++; xSemaphoreGive(xSemaphorePcounter); } else { ESP_LOGE(TAG, "publisher() counter lock"); } if (payload) esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); else esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); } char *unit_data(int i) { char *payload = NULL; char buf[128]; if (xSemaphoreTake(xSemaphoreUnits, 25) == pdTRUE) { payload = xstrcpy((char *)"{\"uuid\":\""); payload = xstrcat(payload, units[i].uuid); payload = xstrcat(payload, (char *)"\",\"alias\":\""); payload = xstrcat(payload, units[i].alias); payload = xstrcat(payload, (char *)"\",\"mode\":\""); payload = xstrcat(payload, (char *)unitMode[units[i].mode]); payload = xstrcat(payload, (char *)"\",\"alarm\":"); sprintf(buf, "%d", units[i].alarm); payload = xstrcat(payload, buf); // temperature_state temperature_address temperature payload = xstrcat(payload, (char *)",\"temperature\":{\"state\":\""); payload = xstrcat(payload, (char *)sensState[units[i].temperature_state]); payload = xstrcat(payload, (char *)"\",\"address\":\""); payload = xstrcat(payload, (char *)units[i].temperature_rom_code); payload = xstrcat(payload, (char *)"\",\"temperature\":"); sprintf(buf, "%.3f", units[i].temperature / 1000.0); payload = xstrcat(payload, buf); // pressure_state pressure_channel pressure_voltage pressure_zero pressure payload = xstrcat(payload, (char *)"},\"pressure\":{\"state\":\""); payload = xstrcat(payload, (char *)sensState[units[i].pressure_state]); payload = xstrcat(payload, (char *)"\",\"channel\":"); sprintf(buf, "%d", units[i].pressure_channel); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"voltage\":"); sprintf(buf, "%.3f", units[i].pressure_voltage / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"zero\":"); sprintf(buf, "%.3f", units[i].pressure_zero / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"bar\":"); sprintf(buf, "%.2f", units[i].pressure / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}}"); xSemaphoreGive(xSemaphoreUnits); } else { ESP_LOGE(TAG, "unit_data(%d) lock error", i); } return payload; } void publishUnits(void) { char *topic = NULL, *payload = NULL, *payloadu = NULL; int i; bool comma = false; payload = payload_header(); payload = xstrcat(payload, (char *)"{\"units\":["); for (i = 0; i < 3; i++) { if (comma) payload = xstrcat(payload, (char *)","); payloadu = unit_data(i); payload = xstrcat(payload, payloadu); comma = true; free(payloadu); payloadu = NULL; } payload = xstrcat(payload, (char *)"]}}"); topic = topic_base((char *)"DBIRTH"); publisher(topic, payload); free(topic); topic = NULL; free(payload); payload = NULL; } void publishNode(void) { char *topic = NULL, *payload = NULL, buf[64]; esp_chip_info_t chip_info; esp_chip_info(&chip_info); payload = payload_header(); payload = xstrcat(payload, (char *)"{\"uuid\":\""); payload = xstrcat(payload, config.uuid); payload = xstrcat(payload, (char *)"\","); payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\""); sprintf(buf, "ESP32 %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\""); payload = xstrcat(payload, (char *)app_desc->version); payload = xstrcat(payload, (char *)"\"}"); if (xSemaphoreTake(xSemaphoreDS18B20, 10) == pdTRUE) { payload = xstrcat(payload, (char *)",\"THB\":{\"temperature\":"); sprintf(buf, "%.3f", ds18b20_state->sensor[0].temperature); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); xSemaphoreGive(xSemaphoreDS18B20); } else { ESP_LOGE(TAG, "publishNode() lock DS18B20 error"); } if (xSemaphoreTake(xSemaphoreWiFi, 25) == pdTRUE) { payload = xstrcat(payload, (char *)",\"net\":{\"address\":\""); payload = xstrcat(payload, wifi_state->STA_ip); payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"rssi\":"); sprintf(buf, "%d", wifi_state->STA_rssi); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}"); xSemaphoreGive(xSemaphoreWiFi); } else { ESP_LOGE(TAG, "publishNode() lock WiFi error"); } payload = xstrcat(payload, (char *)"}}"); // Only NBIRTH messages, no NDATA in this project. topic = topic_base((char *)"NBIRTH"); publisher(topic, payload); free(topic); topic = NULL; free(payload); payload = NULL; } void publishLogs(void) { char *topic = NULL, *payload = NULL, buf[64]; for (int i = 0; i < 3; i++) { if (units[i].mode && ! units[i].alarm) { if (xSemaphoreTake(xSemaphoreUnits, 25) == pdTRUE) { payload = payload_header(); payload = xstrcat(payload, (char *)"{\"uuid\":\""); payload = xstrcat(payload, units[i].uuid); payload = xstrcat(payload, (char *)"\",\"temperature\":"); sprintf(buf, "%.3f", units[i].temperature / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)",\"pressure\":"); sprintf(buf, "%.3f", units[i].pressure / 1000.0); payload = xstrcat(payload, buf); payload = xstrcat(payload, (char *)"}}"); topic = topic_base((char *)"DLOG"); topic = xstrcat(topic, (char *)"/"); topic = xstrcat(topic, units[i].alias); publisher(topic, payload); free(topic); topic = NULL; free(payload); payload = NULL; xSemaphoreGive(xSemaphoreUnits); } else { ESP_LOGE(TAG, "publishLogs() lock error unit %d", i); } } } } static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) { switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { if (count_pub) { count_pub--; } xSemaphoreGive(xSemaphorePcounter); } else { ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event"); } break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); printf("DATA=%.*s\r\n", event->data_len, event->data); break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; case MQTT_EVENT_BEFORE_CONNECT: //ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT"); // Configure connection can be here. break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } return ESP_OK; } static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { mqtt_event_handler_cb(event_data); } /* * Task to read temperature sensors on request. */ void task_mqtt(void *pvParameter) { esp_err_t err; char *uri = NULL, port[11]; ESP_LOGI(TAG, "Starting MQTT task"); xSemaphorePcounter = xSemaphoreCreateMutex(); /* * Initialize Sequence counter from NVS */ err = nvs_open("storage", NVS_READWRITE, &seq_handle); if (err != ESP_OK) { ESP_LOGI(TAG, "Error (%s) opening NVS handle", esp_err_to_name(err)); } else { err = nvs_get_u64(seq_handle, "Sequence_cnt", &Sequence); switch (err) { case ESP_OK: ESP_LOGI(TAG, "Sequence counter from NVS = %lld", Sequence); break; case ESP_ERR_NVS_NOT_FOUND: ESP_LOGI(TAG, "Sequence counter not found"); break; default: ESP_LOGI(TAG, "Error (%s) init Sequence", esp_err_to_name(err)); break; } } /* event handler and event group for the wifi driver */ xEventGroupMQTT = xEventGroupCreate(); EventBits_t uxBits; esp_mqtt_client_config_t mqtt_cfg = { .uri = "mqtt://localhost", }; client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); /* * Task loop forever. */ while (1) { uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); if (uxBits & TASK_MQTT_CONNECT) { if (strlen(config.mqtt_server)) { uri = xstrcpy((char *)"mqtt://"); if (strlen(config.mqtt_user) && strlen(config.mqtt_pwd)) { uri = xstrcat(uri, config.mqtt_user); uri = xstrcat(uri, (char *)":"); uri = xstrcat(uri, config.mqtt_pwd); uri = xstrcat(uri, (char *)"@"); } uri = xstrcat(uri, config.mqtt_server); if (config.mqtt_port != 1883) { uri = xstrcat(uri, (char *)":"); sprintf(port, "%d", config.mqtt_port); uri = xstrcat(uri, port); } } else { uri = xstrcpy((char *)"mqtt://iot.eclipse.org:1883"); } ESP_LOGI(TAG, "Request MQTT connect %s", uri); err = esp_mqtt_client_set_uri(client, uri); if (err != ESP_OK) ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err)); err = esp_mqtt_client_start(client); if (err != ESP_OK) ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); if (uri) free(uri); uri = NULL; xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); } else if (uxBits & TASK_MQTT_DISCONNECT) { ESP_LOGI(TAG, "Request MQTT disconnect"); esp_mqtt_client_stop(client); err = nvs_commit(seq_handle); if (err != ESP_OK) ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err)); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); } } }