diff -r 000000000000 -r 88d965579617 main/task_mqtt.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/main/task_mqtt.c Tue Oct 08 12:00:31 2019 +0200 @@ -0,0 +1,359 @@ +/** + * @file task_mqtt.c + * @brief The FreeRTOS task to maintain MQTT connections. + */ + + +#include "config.h" + + +static const char *TAG = "task_mqtt"; + +EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task + +uint64_t Sequence = 0; ///< Sequence stored in NVS +nvs_handle_t seq_handle; ///< NVS handle + +esp_mqtt_client_handle_t client; ///< MQTT client handle +const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection +const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect +const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected + +const char *sensState[] = { "Ok", "Error" }; +const char *unitMode[] = { "Off", "On" }; + +extern DS18B20_State *ds18b20_state; ///< DS18B20 state +extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore +extern ADC_State *adc_state; ///< ADC state +extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore +extern WIFI_State *wifi_state; ///< WiFi state +extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore + +extern unit_t units[3]; +extern const esp_app_desc_t *app_desc; + + + +void connect_mqtt(bool state) +{ + if (state) + xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT); + else + xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); +} + + + +bool ready_mqtt(void) +{ + if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) + return true; + return false; +} + + + +char *payload_header(void) +{ + char *tmp, buf[128]; + esp_err_t err; + + tmp = xstrcpy((char *)"{\"seq\":"); + sprintf(buf, "%lld", Sequence++); + err = nvs_set_u64(seq_handle, "Sequence_cnt", Sequence); + if (err != ESP_OK) + ESP_LOGE(TAG, "Error %s write Sequence to NVS", esp_err_to_name(err)); + tmp = xstrcat(tmp, buf); + tmp = xstrcat(tmp, (char *)",\"metric\":"); + return tmp; +} + + + +char *topic_base(char *msgtype) +{ + char *tmp; + + tmp = xstrcpy((char *)"mbv1.0/pressure/"); + tmp = xstrcat(tmp, msgtype); + tmp = xstrcat(tmp, (char *)"/"); + tmp = xstrcat(tmp, config.hostname); + return tmp; +} + + + +void publisher(char *topic, char *payload) +{ + // publish the data + if (payload) + esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); + else + esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); +} + + + +char *unit_data(int i) +{ + char *payload = NULL; + char buf[128]; + + payload = xstrcpy((char *)"{\"uuid\":\""); + payload = xstrcat(payload, units[i].uuid); + payload = xstrcat(payload, (char *)"\",\"alias\":\""); + payload = xstrcat(payload, units[i].alias); + + // temperature_state temperature_address temperature + payload = xstrcat(payload, (char *)"\",\"temperature\":{\"state\":\""); + payload = xstrcat(payload, (char *)sensState[units[i].temperature_state]); + payload = xstrcat(payload, (char *)"\",\"address\":\""); + payload = xstrcat(payload, (char *)units[i].temperature_address); + payload = xstrcat(payload, (char *)"\",\"temperature\":"); + sprintf(buf, "%.3f", units[i].temperature / 1000.0); + payload = xstrcat(payload, buf); + + // pressure_state pressure_channel pressure_voltage pressure_zero pressure + payload = xstrcat(payload, (char *)"},\"pressure\":{\"state\":\""); + payload = xstrcat(payload, (char *)sensState[units[i].pressure_state]); + payload = xstrcat(payload, (char *)"\",\"channel\":"); + sprintf(buf, "%d", units[i].pressure_channel); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"voltage\":"); + sprintf(buf, "%.3f", units[i].pressure_voltage / 1000.0); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"zero\":"); + sprintf(buf, "%.3f", units[i].pressure_zero / 1000.0); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)",\"bar\":"); + sprintf(buf, "%.3f", units[i].pressure / 1000.0); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"},\"mode\":\""); + payload = xstrcat(payload, (char *)unitMode[units[i].mode]); + payload = xstrcat(payload, (char *)"\"}"); + return payload; +} + + + +void publishUnits(void) +{ + char *topic = NULL, *payload = NULL, *payloadu = NULL; + int i; + bool comma = false; + + payload = payload_header(); + payload = xstrcat(payload, (char *)"{\"units\":["); + for (i = 0; i < 3; i++) { + if (comma) + payload = xstrcat(payload, (char *)","); + payloadu = unit_data(i); + payload = xstrcat(payload, payloadu); + comma = true; + free(payloadu); + payloadu = NULL; + } + payload = xstrcat(payload, (char *)"]}}"); + topic = topic_base((char *)"DBIRTH"); + publisher(topic, payload); + free(topic); + topic = NULL; + free(payload); + payload = NULL; +} + + + +void publishNode(void) +{ + char *topic = NULL, *payload = NULL, buf[64]; + esp_chip_info_t chip_info; + + esp_chip_info(&chip_info); + payload = payload_header(); + payload = xstrcat(payload, (char *)"{\"uuid\":\""); + payload = xstrcat(payload, config.uuid); + payload = xstrcat(payload, (char *)"\","); + payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\""); + sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores, + (chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "", + (chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "", + chip_info.revision); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\""); + payload = xstrcat(payload, (char *)app_desc->version); + payload = xstrcat(payload, (char *)"\"}"); + + if (xSemaphoreTake(xSemaphoreDS18B20, 10) == pdTRUE) { + payload = xstrcat(payload, (char *)",\"THB\":{\"temperature\":"); + sprintf(buf, "%.3f", ds18b20_state->bottle_temperature); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + xSemaphoreGive(xSemaphoreDS18B20); + } + + if (xSemaphoreTake(xSemaphoreWiFi, 10) == pdTRUE) { + payload = xstrcat(payload, (char *)",\"net\":{\"address\":\""); + payload = xstrcat(payload, wifi_state->STA_ip); + payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"rssi\":"); + sprintf(buf, "%d", wifi_state->STA_rssi); + payload = xstrcat(payload, buf); + payload = xstrcat(payload, (char *)"}"); + xSemaphoreGive(xSemaphoreWiFi); + } + + payload = xstrcat(payload, (char *)"}}"); + // Only NBIRTH messages, no NDATA in this project. + topic = topic_base((char *)"NBIRTH"); + publisher(topic, payload); + free(topic); + topic = NULL; + free(payload); + payload = NULL; +} + + + +static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) +{ + switch (event->event_id) { + + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); + +// topic = topic_base((char *)"NCMD"); +// topic = xstrcat(topic, (char *)"/#"); +// ESP_LOGI(TAG, "Subscribe %s", topic); +// msg_id = esp_mqtt_client_subscribe(client, topic, 0); +// ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); +// free(topic); + +// topic = topic_base((char *)"DCMD"); +// topic = xstrcat(topic, (char *)"/#"); +// ESP_LOGI(TAG, "Subscribe %s", topic); +// msg_id = esp_mqtt_client_subscribe(client, topic, 1); +// ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); +// free(topic); +// topic = NULL; + break; + + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); + xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); + break; + + case MQTT_EVENT_SUBSCRIBED: + ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); + break; + + case MQTT_EVENT_UNSUBSCRIBED: + ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); + break; + + case MQTT_EVENT_PUBLISHED: + ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); + break; + + case MQTT_EVENT_DATA: + ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); + break; + + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + break; + + case MQTT_EVENT_BEFORE_CONNECT: + ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT"); + // Configure connection can be here. + break; + + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; + } + return ESP_OK; +} + + + +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { + ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); + mqtt_event_handler_cb(event_data); +} + + + + +/* + * Task to read temperature sensors on request. + */ +void task_mqtt(void *pvParameter) +{ + esp_err_t err; + + ESP_LOGI(TAG, "Starting MQTT task"); + + /* + * Initialize Sequence counter from NVS + */ + err = nvs_open("storage", NVS_READWRITE, &seq_handle); + if (err != ESP_OK) { + ESP_LOGI(TAG, "Error (%s) opening NVS handle", esp_err_to_name(err)); + } else { + err = nvs_get_u64(seq_handle, "Sequence_cnt", &Sequence); + switch (err) { + case ESP_OK: + ESP_LOGI(TAG, "Sequence counter from NVS = %lld", Sequence); + break; + + case ESP_ERR_NVS_NOT_FOUND: + ESP_LOGI(TAG, "Sequence counter not found"); + break; + + default: + ESP_LOGI(TAG, "Error (%s) init Sequence", esp_err_to_name(err)); + break; + } + } + + /* event handler and event group for the wifi driver */ + xEventGroupMQTT = xEventGroupCreate(); + EventBits_t uxBits; + esp_mqtt_client_config_t mqtt_cfg = { + .uri = "mqtt://seaport.mbse.ym", + }; + /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg); + esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); + + /* + * Task loop forever. + */ + while (1) { + + uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); + + if (uxBits & TASK_MQTT_CONNECT) { + ESP_LOGI(TAG, "Request MQTT connect"); + err = esp_mqtt_client_start(client); + ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); + xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); + + } else if (uxBits & TASK_MQTT_DISCONNECT) { + ESP_LOGI(TAG, "Request MQTT disconnect"); + // publish disconnect messages + err = esp_mqtt_client_stop(client); + ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); + + err = nvs_commit(seq_handle); + if (err != ESP_OK) + ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err)); + xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); + xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); + } + vTaskDelay( (TickType_t)10); + } +} +