main/task_mqtt.c

Tue, 08 Oct 2019 12:00:31 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Tue, 08 Oct 2019 12:00:31 +0200
changeset 0
88d965579617
child 1
1082183cd6bb
permissions
-rw-r--r--

Initial import of the CO2 meter application.

/**
 * @file task_mqtt.c
 * @brief The FreeRTOS task to maintain MQTT connections.
 */


#include "config.h"


static const char		*TAG = "task_mqtt";

EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task

uint64_t			Sequence = 0;		///< Sequence stored in NVS
nvs_handle_t			seq_handle;		///< NVS handle

esp_mqtt_client_handle_t	client;			///< MQTT client handle
const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected

const char			*sensState[] = { "Ok", "Error" };
const char			*unitMode[] = { "Off", "On" };

extern DS18B20_State            *ds18b20_state;         ///< DS18B20 state
extern SemaphoreHandle_t        xSemaphoreDS18B20;      ///< DS18B20 lock semaphore
extern ADC_State		*adc_state;		///< ADC state
extern SemaphoreHandle_t	xSemaphoreADC;		///< ADC lock semaphore
extern WIFI_State		*wifi_state;		///< WiFi state
extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore

extern unit_t			units[3];
extern const esp_app_desc_t	*app_desc;



void connect_mqtt(bool state)
{
    if (state)
    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
    else
    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
}



bool ready_mqtt(void)
{
    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
        return true;
    return false;
}



char *payload_header(void)
{
    char        *tmp, buf[128];
    esp_err_t   err;

    tmp = xstrcpy((char *)"{\"seq\":");
    sprintf(buf, "%lld", Sequence++);
    err = nvs_set_u64(seq_handle, "Sequence_cnt", Sequence);
    if (err != ESP_OK)
	ESP_LOGE(TAG, "Error %s write Sequence to NVS", esp_err_to_name(err));
    tmp = xstrcat(tmp, buf);
    tmp = xstrcat(tmp, (char *)",\"metric\":");
    return tmp;
}



char *topic_base(char *msgtype)
{
    char        *tmp;

    tmp = xstrcpy((char *)"mbv1.0/pressure/");
    tmp = xstrcat(tmp, msgtype);
    tmp = xstrcat(tmp, (char *)"/");
    tmp = xstrcat(tmp, config.hostname);
    return tmp;
}



void publisher(char *topic, char *payload)
{
    // publish the data
    if (payload)
        esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
    else
	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
}



char *unit_data(int i)
{
    char	*payload = NULL;
    char        buf[128];

    payload = xstrcpy((char *)"{\"uuid\":\"");
    payload = xstrcat(payload, units[i].uuid);
    payload = xstrcat(payload, (char *)"\",\"alias\":\"");
    payload = xstrcat(payload, units[i].alias);

    // temperature_state temperature_address temperature
    payload = xstrcat(payload, (char *)"\",\"temperature\":{\"state\":\"");
    payload = xstrcat(payload, (char *)sensState[units[i].temperature_state]);
    payload = xstrcat(payload, (char *)"\",\"address\":\"");
    payload = xstrcat(payload, (char *)units[i].temperature_address);
    payload = xstrcat(payload, (char *)"\",\"temperature\":");
    sprintf(buf, "%.3f", units[i].temperature / 1000.0);
    payload = xstrcat(payload, buf);

    // pressure_state pressure_channel pressure_voltage pressure_zero pressure
    payload = xstrcat(payload, (char *)"},\"pressure\":{\"state\":\"");
    payload = xstrcat(payload, (char *)sensState[units[i].pressure_state]);
    payload = xstrcat(payload, (char *)"\",\"channel\":");
    sprintf(buf, "%d", units[i].pressure_channel);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"voltage\":");
    sprintf(buf, "%.3f", units[i].pressure_voltage / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"zero\":");
    sprintf(buf, "%.3f", units[i].pressure_zero / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"bar\":");
    sprintf(buf, "%.3f", units[i].pressure / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"mode\":\"");
    payload = xstrcat(payload, (char *)unitMode[units[i].mode]);
    payload = xstrcat(payload, (char *)"\"}");
    return payload;
}



void publishUnits(void)
{
    char        *topic = NULL, *payload = NULL, *payloadu = NULL;
    int		i;
    bool	comma = false;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"units\":[");
    for (i = 0; i < 3; i++) {
	    if (comma)
                payload = xstrcat(payload, (char *)",");
            payloadu = unit_data(i);
            payload = xstrcat(payload, payloadu);
            comma = true;
            free(payloadu);
            payloadu = NULL;
    }
    payload = xstrcat(payload, (char *)"]}}");
    topic = topic_base((char *)"DBIRTH");
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



void publishNode(void)
{
    char                *topic = NULL, *payload = NULL, buf[64];
    esp_chip_info_t	chip_info;

    esp_chip_info(&chip_info);
    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"uuid\":\"");
    payload = xstrcat(payload, config.uuid);
    payload = xstrcat(payload, (char *)"\",");
    payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"");
    sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores,
		(chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "", 
		(chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "",
		chip_info.revision);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\"");
    payload = xstrcat(payload, (char *)app_desc->version);
    payload = xstrcat(payload, (char *)"\"}");

    if (xSemaphoreTake(xSemaphoreDS18B20, 10) == pdTRUE) {
        payload = xstrcat(payload, (char *)",\"THB\":{\"temperature\":");
        sprintf(buf, "%.3f", ds18b20_state->bottle_temperature);
        payload = xstrcat(payload, buf);
        payload = xstrcat(payload, (char *)"}");
	xSemaphoreGive(xSemaphoreDS18B20);
    }

    if (xSemaphoreTake(xSemaphoreWiFi, 10) == pdTRUE) {
	payload = xstrcat(payload, (char *)",\"net\":{\"address\":\"");
	payload = xstrcat(payload, wifi_state->STA_ip);
        payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"rssi\":");
	sprintf(buf, "%d", wifi_state->STA_rssi);
	payload = xstrcat(payload, buf);
        payload = xstrcat(payload, (char *)"}");
	xSemaphoreGive(xSemaphoreWiFi);
    }

    payload = xstrcat(payload, (char *)"}}");
    // Only NBIRTH messages, no NDATA in this project.
    topic = topic_base((char *)"NBIRTH");
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
    switch (event->event_id) {

        case MQTT_EVENT_CONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);

//	    topic = topic_base((char *)"NCMD");
//            topic = xstrcat(topic, (char *)"/#");
//	    ESP_LOGI(TAG, "Subscribe %s", topic);
//            msg_id = esp_mqtt_client_subscribe(client, topic, 0);
//            ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
//	    free(topic);

//	    topic = topic_base((char *)"DCMD");
//            topic = xstrcat(topic, (char *)"/#");
//	    ESP_LOGI(TAG, "Subscribe %s", topic);
//            msg_id = esp_mqtt_client_subscribe(client, topic, 1);
//            ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
//	    free(topic);
//	    topic = NULL;
            break;

        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
            break;

        case MQTT_EVENT_SUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_UNSUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_PUBLISHED:
            ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_DATA:
            ESP_LOGI(TAG, "MQTT_EVENT_DATA");
            printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
            printf("DATA=%.*s\r\n", event->data_len, event->data);
            break;

        case MQTT_EVENT_ERROR:
            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
            break;

	case MQTT_EVENT_BEFORE_CONNECT:
	    ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
	    // Configure connection can be here.
	    break;

        default:
            ESP_LOGI(TAG, "Other event id:%d", event->event_id);
            break;
    }
    return ESP_OK;
}



static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
    ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
    mqtt_event_handler_cb(event_data);
}




/*
 * Task to read temperature sensors on request.
 */
void task_mqtt(void *pvParameter)
{
    esp_err_t	err;

    ESP_LOGI(TAG, "Starting MQTT task");

    /*
     * Initialize Sequence counter from NVS
     */
    err = nvs_open("storage", NVS_READWRITE, &seq_handle);
    if (err != ESP_OK) {
        ESP_LOGI(TAG, "Error (%s) opening NVS handle", esp_err_to_name(err));
    } else {
	err = nvs_get_u64(seq_handle, "Sequence_cnt", &Sequence);
	switch (err) {
	    case ESP_OK:
		ESP_LOGI(TAG, "Sequence counter from NVS = %lld", Sequence);
		break;

	    case ESP_ERR_NVS_NOT_FOUND:
		ESP_LOGI(TAG, "Sequence counter not found");
		break;

	    default:
		ESP_LOGI(TAG, "Error (%s) init Sequence", esp_err_to_name(err));
		break;
	}
    }

    /* event handler and event group for the wifi driver */
    xEventGroupMQTT = xEventGroupCreate();
    EventBits_t uxBits;
    esp_mqtt_client_config_t mqtt_cfg = {
        .uri = "mqtt://seaport.mbse.ym",
    };
    /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);

    /*
     * Task loop forever.
     */
    while (1) {

	uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );

	if (uxBits & TASK_MQTT_CONNECT) {
	    ESP_LOGI(TAG, "Request MQTT connect");
	    err = esp_mqtt_client_start(client);
	    ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);

	} else if (uxBits & TASK_MQTT_DISCONNECT) {
	    ESP_LOGI(TAG, "Request MQTT disconnect");
	    // publish disconnect messages
	    err = esp_mqtt_client_stop(client);
	    ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));

	    err = nvs_commit(seq_handle);
	    if (err != ESP_OK)
		ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err));
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	}
	vTaskDelay( (TickType_t)10);
    }
}

mercurial