main/task_mqtt.c

Tue, 03 Oct 2023 17:24:06 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Tue, 03 Oct 2023 17:24:06 +0200
changeset 77
15dc572a7fcb
parent 69
5437e0514d59
permissions
-rw-r--r--

Version 0.3.0. Backported network code from experimental roaming project. Will now connect after reset to the strongest AP. Id the signal level drops below -67, extra scans are done to see for a better AP. Nothing is done yet. Removed config.conf file, all info is taken from the project menu and live tests. Better log the board type and send it via json mqtt. Send bssid and current channel too.

/**
 * @file task_mqtt.c
 * @brief The FreeRTOS task to maintain MQTT connections.
 */


#include "config.h"


static const char		*TAG = "task_mqtt";

EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task
SemaphoreHandle_t		xSemaphorePcounter;	///< Publish counter semaphore.
int				count_pub = 0;		///< Outstanding published messages.
esp_mqtt_client_handle_t	client;			///< MQTT client handle

const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected
const int TASK_MQTT_STARTED = BIT3;			///< MQTT is started

const char			*sensState[] = { "OK", "ERROR" };	///< Sensor state strings
const char			*unitMode[] = { "OFF", "ON" };		///< Units state strings

extern DS18B20_State            *ds18b20_state;         ///< DS18B20 state
extern SemaphoreHandle_t        xSemaphoreDS18B20;      ///< DS18B20 lock semaphore
extern ADC_State		*adc_state;		///< ADC state
extern SemaphoreHandle_t	xSemaphoreADC;		///< ADC lock semaphore
extern WIFI_State		*wifi_state;		///< WiFi state
extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore
extern unit_t			units[3];
extern SemaphoreHandle_t	xSemaphoreUnits;
extern const esp_app_desc_t	*app_desc;
extern char			hostname[];
extern char			uuid[];

void connect_mqtt(bool state)
{
    if (state)
    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
    else
    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
}



bool ready_mqtt(void)
{
    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
        return true;
    return false;
}



/**
 * @brief Generate the mqtt payload header.
 * @return Allocated character string with the header.
 */
char *payload_header(void)
{
    char        *tmp;

    tmp = xstrcpy((char *)"{\"metric\":");
    return tmp;
}



/**
 * @brief Generate the mqtt topic base part.
 * @param msgtype The message type part of the topic.
 * @return The topic string allocated in memory.
 */
char *topic_base(char *msgtype)
{
    char        *tmp;

    tmp = xstrcpy((char *)"mbv1.0/co2meters/");
    tmp = xstrcat(tmp, msgtype);
    tmp = xstrcat(tmp, (char *)"/");
    tmp = xstrcat(tmp, hostname);
    return tmp;
}



/**
 * @brief The mqtt generic publish function.
 * @param topic The topic of the mqtt message.
 * @param payload The payload of the mqtt message.
 */
void publisher(char *topic, char *payload)
{
    /*
     * First count, then sent the data.
     */
    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
        count_pub++;
        xSemaphoreGive(xSemaphorePcounter);
    } else {
        ESP_LOGE(TAG, "publisher() counter lock");
    }

    if (payload)
        esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
    else
	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
}



/**
 * @brief Generate json data for the given unit.
 * @param i The unit record number.
 * @return The json string allocated in memory.
 */
char *unit_data(int i)
{
    char	*payload = NULL;
    char        buf[128];

    if (xSemaphoreTake(xSemaphoreUnits, 25) == pdTRUE) {
    	payload = xstrcpy((char *)"{\"uuid\":\"");
    	payload = xstrcat(payload, units[i].uuid);
    	payload = xstrcat(payload, (char *)"\",\"alias\":\"");
    	payload = xstrcat(payload, units[i].alias);
        payload = xstrcat(payload, (char *)"\",\"mode\":\"");
        payload = xstrcat(payload, (char *)unitMode[units[i].mode]);
	payload = xstrcat(payload, (char *)"\",\"alarm\":");
	sprintf(buf, "%lu", units[i].alarm);
	payload = xstrcat(payload, buf);

    	// temperature_state temperature_address temperature
    	payload = xstrcat(payload, (char *)",\"temperature\":{\"state\":\"");
    	payload = xstrcat(payload, (char *)sensState[units[i].temperature_state]);
    	payload = xstrcat(payload, (char *)"\",\"address\":\"");
    	payload = xstrcat(payload, (char *)units[i].temperature_rom_code);
    	payload = xstrcat(payload, (char *)"\",\"temperature\":");
    	sprintf(buf, "%.3f", units[i].temperature / 1000.0);
    	payload = xstrcat(payload, buf);

    	// pressure_state pressure_channel pressure_voltage pressure_zero pressure
    	payload = xstrcat(payload, (char *)"},\"pressure\":{\"state\":\"");
    	payload = xstrcat(payload, (char *)sensState[units[i].pressure_state]);
    	payload = xstrcat(payload, (char *)"\",\"channel\":");
    	sprintf(buf, "%d", units[i].pressure_channel);
    	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)",\"voltage\":");
    	sprintf(buf, "%.3f", units[i].pressure_voltage / 1000.0);
    	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)",\"zero\":");
    	sprintf(buf, "%.3f", units[i].pressure_zero / 1000.0);
    	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)",\"bar\":");
    	sprintf(buf, "%.2f", units[i].pressure / 1000.0);
    	payload = xstrcat(payload, buf);
    	payload = xstrcat(payload, (char *)"}}");
	xSemaphoreGive(xSemaphoreUnits);
    } else {
	ESP_LOGE(TAG, "unit_data(%d) lock error", i);
    }
    return payload;
}



void publishUnits(void)
{
    char        *topic = NULL, *payload = NULL, *payloadu = NULL;
    int		i;
    bool	comma = false;

    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"units\":[");
    for (i = 0; i < 3; i++) {
	    if (comma)
                payload = xstrcat(payload, (char *)",");
            payloadu = unit_data(i);
            payload = xstrcat(payload, payloadu);
            comma = true;
            free(payloadu);
            payloadu = NULL;
    }
    payload = xstrcat(payload, (char *)"]}}");
    topic = topic_base((char *)"DBIRTH");
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



void publishNode(void)
{
    char                *topic = NULL, *payload = NULL, buf[64];
    esp_chip_info_t	chip_info;

    esp_chip_info(&chip_info);
    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"uuid\":\"");
    payload = xstrcat(payload, uuid);
    payload = xstrcat(payload, (char *)"\",\"interval\":");
    sprintf(buf, "%d", MAINLOOP_TIMER);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"");
    if (chip_info.model == CHIP_ESP32)
       payload = xstrcat(payload, (char *)"ESP32");
    else if (chip_info.model == CHIP_ESP32S2)
       payload = xstrcat(payload, (char *)"ESP32-S2");
    else if (chip_info.model == CHIP_ESP32S3)
        payload = xstrcat(payload, (char *)"ESP32-S3");
    else if (chip_info.model == CHIP_ESP32C3)
        payload = xstrcat(payload, (char *)"ESP32-C3");
    else if (chip_info.model == CHIP_ESP32C2)
        payload = xstrcat(payload, (char *)"ESP32-C2");
    else if (chip_info.model == CHIP_ESP32C6)
        payload = xstrcat(payload, (char *)"ESP32-C6");
    else if (chip_info.model == CHIP_ESP32H2)
        payload = xstrcat(payload, (char *)"ESP32-H2");
    else if (chip_info.model == CHIP_POSIX_LINUX)
       payload = xstrcat(payload, (char *)"Posix Linux");
    else
       payload = xstrcat(payload, (char *)"Unknown");
    sprintf(buf, " %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"\",\"os\":\"esp-idf\",\"os_version\":\"");
    payload = xstrcat(payload, (char *)esp_get_idf_version());
    payload = xstrcat(payload, (char *)"\",\"FW\":\"");
    payload = xstrcat(payload, (char *)app_desc->version);
    payload = xstrcat(payload, (char *)"\"}");

    if (xSemaphoreTake(xSemaphoreDS18B20, 10) == pdTRUE) {
        payload = xstrcat(payload, (char *)",\"THB\":{\"temperature\":");
        sprintf(buf, "%.3f", ds18b20_state->sensor[0].temperature);
        payload = xstrcat(payload, buf);
        payload = xstrcat(payload, (char *)"}");
	xSemaphoreGive(xSemaphoreDS18B20);
    } else {
	ESP_LOGE(TAG, "publishNode() lock DS18B20 error");
    }

    if (xSemaphoreTake(xSemaphoreWiFi, 25) == pdTRUE) {
	payload = xstrcat(payload, (char *)",\"net\":{\"address\":\"");
	payload = xstrcat(payload, wifi_state->STA_ip);
        payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"ssid\":\"");
	payload = xstrcat(payload, wifi_state->STA_ssid);
	payload = xstrcat(payload, (char *)"\",\"rssi\":");
	sprintf(buf, "%d", wifi_state->STA_rssi);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"bssid\":\"");
	payload = xstrcat(payload, wifi_state->STA_bssid);
	payload = xstrcat(payload, (char *)"\",\"channel\":");
	sprintf(buf, "%d", wifi_state->STA_channel);
	payload = xstrcat(payload, buf);
        payload = xstrcat(payload, (char *)"}");
	xSemaphoreGive(xSemaphoreWiFi);
    } else {
	ESP_LOGE(TAG, "publishNode() lock WiFi error");
    }

    payload = xstrcat(payload, (char *)"}}");
    // Only NBIRTH messages, no NDATA in this project.
    topic = topic_base((char *)"NBIRTH");
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



void publishLogs(void)
{
    char	*topic = NULL, *payload = NULL, buf[64];

    for (int i = 0; i < 3; i++) {
	if (units[i].mode && ! units[i].alarm) {
	    if (xSemaphoreTake(xSemaphoreUnits, 25) == pdTRUE) {
		payload = payload_header();
		payload = xstrcat(payload, (char *)"{\"uuid\":\"");
    		payload = xstrcat(payload, units[i].uuid);
		payload = xstrcat(payload, (char *)"\",\"temperature\":");
		sprintf(buf, "%.3f", units[i].temperature / 1000.0);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)",\"pressure\":");
		sprintf(buf, "%.3f", units[i].pressure / 1000.0);
		payload = xstrcat(payload, buf);
		payload = xstrcat(payload, (char *)"}}");
		topic = topic_base((char *)"DLOG");
		topic = xstrcat(topic, (char *)"/");
		topic = xstrcat(topic, units[i].alias);
		publisher(topic, payload);
		free(topic);
		topic = NULL;
		free(payload);
		payload = NULL;
		xSemaphoreGive(xSemaphoreUnits);
	    } else {
		ESP_LOGE(TAG, "publishLogs() lock error unit %d", i);
	    }
	}
    }
}



static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
    switch (event->event_id) {

        case MQTT_EVENT_CONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_STARTED);
            break;

        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
            break;

        case MQTT_EVENT_SUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_UNSUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_PUBLISHED:
            ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
	    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
	    	if (count_pub) {
		    count_pub--;
	    	}
		xSemaphoreGive(xSemaphorePcounter);
	    } else {
        	ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event");
	    }
            break;

        case MQTT_EVENT_DATA:
            ESP_LOGI(TAG, "MQTT_EVENT_DATA");
            printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
            printf("DATA=%.*s\r\n", event->data_len, event->data);
            break;

        case MQTT_EVENT_ERROR:
            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
            break;

	case MQTT_EVENT_BEFORE_CONNECT:
	    //ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
	    // Configure connection can be here.
	    break;

        default:
            ESP_LOGI(TAG, "Other event id:%d", event->event_id);
            break;
    }
    return ESP_OK;
}



static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
    mqtt_event_handler_cb(event_data);
}



/*
 * Task to read temperature sensors on request.
 */
void task_mqtt(void *pvParameter)
{
    esp_err_t	err;
    char	*uri = NULL, port[11];

    ESP_LOGI(TAG, "Starting MQTT task");
    xSemaphorePcounter = xSemaphoreCreateMutex();

    /* event handler and event group for the wifi driver */
    xEventGroupMQTT = xEventGroupCreate();
    EventBits_t uxBits;
    esp_mqtt_client_config_t mqtt_cfg = {
	.broker = {
            .address.uri = "mqtt://localhost",
	},
    };
    client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);

    /*
     * Task loop forever.
     */
    while (1) {

	uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );

	if (uxBits & TASK_MQTT_CONNECT) {
	    if (! (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)) {
		if (strlen(CONFIG_MQTT_SERVER)) {
	    	    uri = xstrcpy((char *)"mqtt://");
		    if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) {
			uri = xstrcat(uri, CONFIG_MQTT_USER);
		    	uri = xstrcat(uri, (char *)":");
			uri = xstrcat(uri, CONFIG_MQTT_PASS);
		    	uri = xstrcat(uri, (char *)"@");
	    	    }
		    uri = xstrcat(uri, CONFIG_MQTT_SERVER);
		    if (CONFIG_MQTT_PORT != 1883) {
		    	uri = xstrcat(uri, (char *)":");
			sprintf(port, "%d", CONFIG_MQTT_PORT);
		    	uri = xstrcat(uri, port);
		    }
	    	} else {
		    uri = xstrcpy((char *)"mqtt://iot.eclipse.org:1883");
	    	}

	    	ESP_LOGI(TAG, "Request MQTT connect %s", uri);
	    	err = esp_mqtt_client_set_uri(client, uri);
	    	if (err != ESP_OK)
                    ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));

		if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_STARTED) {
		    /* Existing session */
		    err = esp_mqtt_client_reconnect(client);
		    if (err != ESP_OK)
			ESP_LOGE(TAG, "Reconnect result %s", esp_err_to_name(err));
		} else {
		    /* New session */
	    	    err = esp_mqtt_client_start(client);
	    	    if (err != ESP_OK)
	    		ESP_LOGE(TAG, "Start result %s", esp_err_to_name(err));
		}
	    	if (uri)
		    free(uri);
	    	uri = NULL;
	    } else {
		ESP_LOGI(TAG, "Request MQTT connect but already connected.");
	    }
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);

	} else if (uxBits & TASK_MQTT_DISCONNECT) {
	    ESP_LOGI(TAG, "Request MQTT disconnect");
	    esp_mqtt_client_stop(client);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
//	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	    ESP_LOGI(TAG, "Request MQTT disconnect done");
	}
    }
}

mercurial