main/task_mqtt.c

Mon, 15 Jun 2020 13:38:46 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Mon, 15 Jun 2020 13:38:46 +0200
changeset 60
07a1a07fdc8c
parent 57
232f318a6b51
child 69
5437e0514d59
permissions
-rw-r--r--

write_units now overwrites instead of truncate/write the records. Add ssid to the node mqtt message. Lower rotary log messages. Removed most menu log messages.

/**
 * @file task_mqtt.c
 * @brief The FreeRTOS task to maintain MQTT connections.
 */


#include "config.h"


static const char		*TAG = "task_mqtt";

EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task
SemaphoreHandle_t		xSemaphorePcounter;	///< Publish counter semaphore.
int				count_pub = 0;		///< Outstanding published messages.
esp_mqtt_client_handle_t	client;			///< MQTT client handle

const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected

const char			*sensState[] = { "OK", "ERROR" };	///< Sensor state strings
const char			*unitMode[] = { "OFF", "ON" };		///< Units state strings

extern DS18B20_State            *ds18b20_state;         ///< DS18B20 state
extern SemaphoreHandle_t        xSemaphoreDS18B20;      ///< DS18B20 lock semaphore
extern ADC_State		*adc_state;		///< ADC state
extern SemaphoreHandle_t	xSemaphoreADC;		///< ADC lock semaphore
extern WIFI_State		*wifi_state;		///< WiFi state
extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore
extern unit_t			units[3];
extern SemaphoreHandle_t	xSemaphoreUnits;
extern const esp_app_desc_t	*app_desc;



void connect_mqtt(bool state)
{
    if (state)
    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
    else
    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
}



bool ready_mqtt(void)
{
    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
        return true;
    return false;
}



/**
 * @brief Generate the mqtt payload header.
 * @return Allocated character string with the header.
 */
char *payload_header(void)
{
    char        *tmp;

    tmp = xstrcpy((char *)"{\"metric\":");
    return tmp;
}



/**
 * @brief Generate the mqtt topic base part.
 * @param msgtype The message type part of the topic.
 * @return The topic string allocated in memory.
 */
char *topic_base(char *msgtype)
{
    char        *tmp;

    tmp = xstrcpy((char *)"mbv1.0/co2meters/");
    tmp = xstrcat(tmp, msgtype);
    tmp = xstrcat(tmp, (char *)"/");
    tmp = xstrcat(tmp, config.hostname);
    return tmp;
}



/**
 * @brief The mqtt generic publish function.
 * @param topic The topic of the mqtt message.
 * @param payload The payload of the mqtt message.
 */
void publisher(char *topic, char *payload)
{
    /*
     * First count, then sent the data.
     */
    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
        count_pub++;
        xSemaphoreGive(xSemaphorePcounter);
    } else {
        ESP_LOGE(TAG, "publisher() counter lock");
    }

    if (payload)
        esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
    else
	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
}



/**
 * @brief Generate json data for the given unit.
 * @param i The unit record number.
 * @return The json string allocated in memory.
 */
char *unit_data(int i)
{
    char	*payload = NULL;
    char        buf[128];

    if (xSemaphoreTake(xSemaphoreUnits, 25) == pdTRUE) {
    	payload = xstrcpy((char *)"{\"uuid\":\"");
    	payload = xstrcat(payload, units[i].uuid);
    	payload = xstrcat(payload, (char *)"\",\"alias\":\"");
    	payload = xstrcat(payload, units[i].alias);
        payload = xstrcat(payload, (char *)"\",\"mode\":\"");
        payload = xstrcat(payload, (char *)unitMode[units[i].mode]);
	payload = xstrcat(payload, (char *)"\",\"alarm\":");
	sprintf(buf, "%d", units[i].alarm);
	payload = xstrcat(payload, buf);

    	// temperature_state temperature_address temperature
    	payload = xstrcat(payload, (char *)",\"temperature\":{\"state\":\"");
    	payload = xstrcat(payload, (char *)sensState[units[i].temperature_state]);
    	payload = xstrcat(payload, (char *)"\",\"address\":\"");
    	payload = xstrcat(payload, (char *)units[i].temperature_rom_code);
    	payload = xstrcat(payload, (char *)"\",\"temperature\":");
    	sprintf(buf, "%.3f", units[i].temperature / 1000.0);
    	payload = xstrcat(payload, buf);

    	// pressure_state pressure_channel pressure_voltage pressure_zero pressure
    	payload = xstrcat(payload, (char *)"},\"pressure\":{\"state\":\"");
    	payload = xstrcat(payload, (char *)sensState[units[i].pressure_state]);
    	payload = xstrcat(payload, (char *)"\",\"channel\":");
    	sprintf(buf, "%d", units[i].pressure_channel);
    	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)",\"voltage\":");
    	sprintf(buf, "%.3f", units[i].pressure_voltage / 1000.0);
    	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)",\"zero\":");
    	sprintf(buf, "%.3f", units[i].pressure_zero / 1000.0);
    	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)",\"bar\":");
    	sprintf(buf, "%.2f", units[i].pressure / 1000.0);
    	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)"}}");
	xSemaphoreGive(xSemaphoreUnits);
    } else {
	ESP_LOGE(TAG, "unit_data(%d) lock error", i);
    }
    return payload;
}



void publishUnits(void)
{
    char        *topic = NULL, *payload = NULL, *payloadu = NULL;
    int		i;
    bool	comma = false;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"units\":[");
    for (i = 0; i < 3; i++) {
	    if (comma)
                payload = xstrcat(payload, (char *)",");
            payloadu = unit_data(i);
            payload = xstrcat(payload, payloadu);
            comma = true;
            free(payloadu);
            payloadu = NULL;
    }
    payload = xstrcat(payload, (char *)"]}}");
    topic = topic_base((char *)"DBIRTH");
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



void publishNode(void)
{
    char                *topic = NULL, *payload = NULL, buf[64];
    esp_chip_info_t	chip_info;

    esp_chip_info(&chip_info);
    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"uuid\":\"");
    payload = xstrcat(payload, config.uuid);
    payload = xstrcat(payload, (char *)"\",\"interval\":");
    sprintf(buf, "%d", MAINLOOP_TIMER);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"");
    sprintf(buf, "ESP32 %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"\",\"os\":\"esp-idf\",\"os_version\":\"");
    payload = xstrcat(payload, (char *)esp_get_idf_version());
    payload = xstrcat(payload, (char *)"\",\"FW\":\"");
    payload = xstrcat(payload, (char *)app_desc->version);
    payload = xstrcat(payload, (char *)"\"}");

    if (xSemaphoreTake(xSemaphoreDS18B20, 10) == pdTRUE) {
        payload = xstrcat(payload, (char *)",\"THB\":{\"temperature\":");
        sprintf(buf, "%.3f", ds18b20_state->sensor[0].temperature);
        payload = xstrcat(payload, buf);
        payload = xstrcat(payload, (char *)"}");
	xSemaphoreGive(xSemaphoreDS18B20);
    } else {
	ESP_LOGE(TAG, "publishNode() lock DS18B20 error");
    }

    if (xSemaphoreTake(xSemaphoreWiFi, 25) == pdTRUE) {
	payload = xstrcat(payload, (char *)",\"net\":{\"address\":\"");
	payload = xstrcat(payload, wifi_state->STA_ip);
        payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"ssid\":\"");
	payload = xstrcat(payload, wifi_state->STA_ssid);
	payload = xstrcat(payload, (char *)"\",\"rssi\":");
	sprintf(buf, "%d", wifi_state->STA_rssi);
	payload = xstrcat(payload, buf);
        payload = xstrcat(payload, (char *)"}");
	xSemaphoreGive(xSemaphoreWiFi);
    } else {
	ESP_LOGE(TAG, "publishNode() lock WiFi error");
    }

    payload = xstrcat(payload, (char *)"}}");
    // Only NBIRTH messages, no NDATA in this project.
    topic = topic_base((char *)"NBIRTH");
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



void publishLogs(void)
{
    char	*topic = NULL, *payload = NULL, buf[64];

    for (int i = 0; i < 3; i++) {
	if (units[i].mode && ! units[i].alarm) {
	    if (xSemaphoreTake(xSemaphoreUnits, 25) == pdTRUE) {
		payload = payload_header();
		payload = xstrcat(payload, (char *)"{\"uuid\":\"");
    		payload = xstrcat(payload, units[i].uuid);
		payload = xstrcat(payload, (char *)"\",\"temperature\":");
		sprintf(buf, "%.3f", units[i].temperature / 1000.0);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)",\"pressure\":");
		sprintf(buf, "%.3f", units[i].pressure / 1000.0);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)"}}");
		topic = topic_base((char *)"DLOG");
		topic = xstrcat(topic, (char *)"/");
		topic = xstrcat(topic, units[i].alias);
		publisher(topic, payload);
		free(topic);
		topic = NULL;
		free(payload);
		payload = NULL;
		xSemaphoreGive(xSemaphoreUnits);
	    } else {
		ESP_LOGE(TAG, "publishLogs() lock error unit %d", i);
	    }
	}
    }
}



static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
    switch (event->event_id) {

        case MQTT_EVENT_CONNECTED:
            ESP_LOGD(TAG, "MQTT_EVENT_CONNECTED");
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
            break;

        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGD(TAG, "MQTT_EVENT_DISCONNECTED");
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
            break;

        case MQTT_EVENT_SUBSCRIBED:
            ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_UNSUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_PUBLISHED:
            ESP_LOGD(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
	    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
	    	if (count_pub) {
		    count_pub--;
	    	}
		xSemaphoreGive(xSemaphorePcounter);
	    } else {
        	ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event");
	    }
            break;

        case MQTT_EVENT_DATA:
            ESP_LOGI(TAG, "MQTT_EVENT_DATA");
            printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
            printf("DATA=%.*s\r\n", event->data_len, event->data);
            break;

        case MQTT_EVENT_ERROR:
            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
            break;

	case MQTT_EVENT_BEFORE_CONNECT:
	    //ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
	    // Configure connection can be here.
	    break;

        default:
            ESP_LOGI(TAG, "Other event id:%d", event->event_id);
            break;
    }
    return ESP_OK;
}



static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
    mqtt_event_handler_cb(event_data);
}



/*
 * Task to read temperature sensors on request.
 */
void task_mqtt(void *pvParameter)
{
    esp_err_t	err;
    char	*uri = NULL, port[11];

    ESP_LOGI(TAG, "Starting MQTT task");
    xSemaphorePcounter = xSemaphoreCreateMutex();

    /* event handler and event group for the wifi driver */
    xEventGroupMQTT = xEventGroupCreate();
    EventBits_t uxBits;
    esp_mqtt_client_config_t mqtt_cfg = {
        .uri = "mqtt://localhost",
    };
    client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);

    /*
     * Task loop forever.
     */
    while (1) {

	uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );

	if (uxBits & TASK_MQTT_CONNECT) {
	    if (strlen(config.mqtt_server)) {
	    	uri = xstrcpy((char *)"mqtt://");
	    	if (strlen(config.mqtt_user) && strlen(config.mqtt_pwd)) {
		    uri = xstrcat(uri, config.mqtt_user);
		    uri = xstrcat(uri, (char *)":");
		    uri = xstrcat(uri, config.mqtt_pwd);
		    uri = xstrcat(uri, (char *)"@");
	    	}
		uri = xstrcat(uri, config.mqtt_server);
		if (config.mqtt_port != 1883) {
		    uri = xstrcat(uri, (char *)":");
		    sprintf(port, "%d", config.mqtt_port);
		    uri = xstrcat(uri, port);
		}
	    } else {
		uri = xstrcpy((char *)"mqtt://iot.eclipse.org:1883");
	    }
	    ESP_LOGI(TAG, "Request MQTT connect %s", uri);
	    err = esp_mqtt_client_set_uri(client, uri);
	    if (err != ESP_OK)
                ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
	    err = esp_mqtt_client_start(client);
	    if (err != ESP_OK)
	    	ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
	    if (uri)
		free(uri);
	    uri = NULL;
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);

	} else if (uxBits & TASK_MQTT_DISCONNECT) {
	    ESP_LOGI(TAG, "Request MQTT disconnect");
	    esp_mqtt_client_stop(client);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	}
    }
}

mercurial