main/task_mqtt.c

Thu, 30 Mar 2023 17:05:05 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Thu, 30 Mar 2023 17:05:05 +0200
changeset 5
b1f38105ca7e
child 6
bad3414f7bc4
permissions
-rw-r--r--

Added task MQTT and some utilities. Added more power measurement variables and code. INA219 measurements are saved in the State record.

/**
 * @file task_mqtt.c
 * @brief The FreeRTOS task to maintain MQTT connections.
 */


#include "config.h"


static const char		*TAG = "task_mqtt";

EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task
SemaphoreHandle_t		xSemaphorePcounter;	///< Publish counter semaphore.
int				count_pub = 0;		///< Outstanding published messages.
esp_mqtt_client_handle_t	client;			///< MQTT client handle

const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected

const char			*sensState[] = { "OK", "ERROR" };	///< Sensor state strings
const char			*unitMode[] = { "OFF", "ON" };		///< Units state strings

extern BMP280_State		*bmp280_state;		///< BMP280 state
extern SemaphoreHandle_t        xSemaphoreBMP280;	///< BMP280 lock semaphore
extern INA219_State		*ina219_state;		///< INA219 state
extern SemaphoreHandle_t	xSemaphoreINA219;	///< INA219 lock semaphore
extern WIFI_State		*wifi_state;		///< WiFi state
extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore
extern const esp_app_desc_t	*app_desc;

extern uint32_t			Alarm;
extern float			batteryState;
extern float			batteryVolts;
extern float			batteryCurrent;
extern float			batteryPower;
extern float			solarVolts;
extern float			solarCurrent;
extern float			solarPower;


void connect_mqtt(bool state)
{
    if (state)
    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
    else
    	xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
}



bool ready_mqtt(void)
{
    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
        return true;
    return false;
}



/**
 * @brief Generate the mqtt payload header.
 * @return Allocated character string with the header.
 */
char *payload_header(void)
{
    char        *tmp;

    tmp = xstrcpy((char *)"{\"metric\":");
    return tmp;
}



/**
 * @brief Generate the mqtt topic base part.
 * @return The topic string allocated in memory.
 */
char *topic_base(void)
{
    char        *tmp;

#ifdef CONFIG_CODE_PRODUCTION
    tmp = xstrcpy((char *)"balkon/");
#endif
#ifdef CONFIG_CODE_TESTING
    tmp = xstrcpy((char *)"wemos/");
#endif

    return tmp;
}



/**
 * @brief The mqtt generic publish function.
 * @param topic The topic of the mqtt message.
 * @param payload The payload of the mqtt message.
 */
void publisher(char *topic, char *payload)
{
    /*
     * First count, then sent the data.
     */
    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
        count_pub++;
        xSemaphoreGive(xSemaphorePcounter);
    } else {
        ESP_LOGE(TAG, "publisher() counter lock");
    }

    if (payload)
        esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
    else
	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
}



void publish(void)
{
    char        *topic = NULL, *payload = NULL, buf[64];

    // {"system":{"battery":70,"alarm":0,"version":"0.2.6","rssi":-56,"wifi":88,"light":{"lux":12.34,"gain":2}},"solar":{"voltage":13.98,"current":234.1,"power":3.272718},"battery":{"voltage":13.21,"current":4.942289,"power":0.065288},"real":{"current":229.1577},"TH":{"temperature":20.2,"humidity":48.3},"output":{"relay1":0,"relay2":0,"dimmer3":0,"dimmer4":0}}
    //
    payload = payload_header();
    payload = xstrcat(payload, (char *)"{\"system\":{\"battery\":");
    sprintf(buf, "%.0f", batteryState);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"alarm\":");
    sprintf(buf, "%ld", Alarm);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"version\":\"");
    payload = xstrcat(payload, (char *)app_desc->version);
    payload = xstrcat(payload, (char *)",\"rssi\":");

    payload = xstrcat(payload, (char *)",\"light\":{\"lux\":");

    payload = xstrcat(payload, (char *)",\"gain\":");

    payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":");
    sprintf(buf, "%.2f", solarVolts);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"current\":");
    sprintf(buf, "%.1f", solarCurrent);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"power\":");
    sprintf(buf, "%.3f", solarPower / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":");
    sprintf(buf, "%.2f", batteryVolts);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"current\":");
    sprintf(buf, "%.1f", batteryCurrent);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"power\":");
    sprintf(buf, "%.3f", batteryPower / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"real\":{\"current\":");
    payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":");
    if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) {
	sprintf(buf, "%.2f", bmp280_state->temperature);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"pressure\":");
	sprintf(buf, "%.1f", bmp280_state->pressure / 100.0);
	payload = xstrcat(payload, buf);
	xSemaphoreGive(xSemaphoreBMP280);
    }
    payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":");

    payload = xstrcat(payload, (char *)",\"relay2\":");

    payload = xstrcat(payload, (char *)",\"dimmer3\":");

    payload = xstrcat(payload, (char *)",\"dimmer4\":");

    payload = xstrcat(payload, (char *)"}}");
    topic = topic_base();
    topic = xstrcat(topic, (char *)"status");
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
    switch (event->event_id) {

        case MQTT_EVENT_CONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
            break;

        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
            break;

        case MQTT_EVENT_SUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_UNSUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_PUBLISHED:
            ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
	    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
	    	if (count_pub) {
		    count_pub--;
	    	}
		xSemaphoreGive(xSemaphorePcounter);
	    } else {
        	ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event");
	    }
            break;

        case MQTT_EVENT_DATA:
            ESP_LOGI(TAG, "MQTT_EVENT_DATA");
            printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
            printf("DATA=%.*s\r\n", event->data_len, event->data);
            break;

        case MQTT_EVENT_ERROR:
            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
            break;

	case MQTT_EVENT_BEFORE_CONNECT:
	    ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
	    // Configure connection can be here.
	    break;

        default:
            ESP_LOGI(TAG, "Other event id:%d", event->event_id);
            break;
    }
    return ESP_OK;
}



static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
    mqtt_event_handler_cb(event_data);
}



/*
 * Task to read temperature sensors on request.
 */
void task_mqtt(void *pvParameter)
{
    esp_err_t	err;
    char	*uri = NULL, port[11];

    ESP_LOGI(TAG, "Starting MQTT task");
    xSemaphorePcounter = xSemaphoreCreateMutex();

    /* event handler and event group for the wifi driver */
    xEventGroupMQTT = xEventGroupCreate();
    EventBits_t uxBits;

    esp_mqtt_client_config_t mqtt_cfg = {
        .broker.address.uri = "mqtt://localhost",
    };
    client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);

    /*
     * Task loop forever.
     */
    while (1) {

	uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );

	if (uxBits & TASK_MQTT_CONNECT) {
	    uri = xstrcpy((char *)"mqtt://");
	    if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) {
		uri = xstrcat(uri, CONFIG_MQTT_USER);
		uri = xstrcat(uri, (char *)":");
		uri = xstrcat(uri, CONFIG_MQTT_PASS);
		uri = xstrcat(uri, (char *)"@");
	    }
	    uri = xstrcat(uri, CONFIG_MQTT_SERVER);
	    if (CONFIG_MQTT_PORT != 1883) {
		uri = xstrcat(uri, (char *)":");
		sprintf(port, "%d", CONFIG_MQTT_PORT);
		uri = xstrcat(uri, port);
	    }

	    ESP_LOGI(TAG, "Request MQTT connect %s", uri);
	    err = esp_mqtt_client_set_uri(client, uri);
            if (err != ESP_OK)
                ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
	    err = esp_mqtt_client_start(client);
	    if (err != ESP_OK)
	    	ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);

	} else if (uxBits & TASK_MQTT_DISCONNECT) {
	    ESP_LOGI(TAG, "Request MQTT disconnect");
	    esp_mqtt_client_stop(client);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	}
    }
}

mercurial