main/task_mqtt.c

Sun, 16 Apr 2023 12:27:12 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Sun, 16 Apr 2023 12:27:12 +0200
changeset 30
7448b8dd4288
parent 26
29dc2064e2ce
child 32
84e54b14e7db
permissions
-rw-r--r--

Preparations for BLE GATT. Added extra time after INA219 is powered on before measurement. Reduced LEDC frequency to 60 Hz, that makes the LED lights less nervous. Hardware mod on output 4, now needs external pulldown resistor.

/**
 * @file task_mqtt.c
 * @brief The FreeRTOS task to maintain MQTT connections.
 */


#include "config.h"


static const char		*TAG = "task_mqtt";

EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task
SemaphoreHandle_t		xSemaphorePcounter;	///< Publish counter semaphore.
int				count_pub = 0;		///< Outstanding published messages.
esp_mqtt_client_handle_t	client;			///< MQTT client handle

const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected
const int TASK_MQTT_DISCONNECTED = BIT3;

const char			*sensState[] = { "OK", "ERROR" };	///< Sensor state strings
const char			*unitMode[] = { "OFF", "ON" };		///< Units state strings

extern BMP280_State		*bmp280_state;		///< BMP280 state
extern SemaphoreHandle_t        xSemaphoreBMP280;	///< BMP280 lock semaphore
extern INA219_State		*ina219_state;		///< INA219 state
extern SemaphoreHandle_t	xSemaphoreINA219;	///< INA219 lock semaphore
extern APDS9930_State		*apds9930_state;	///< APDS9930 state
extern SemaphoreHandle_t	xSemaphoreAPDS9930;	///< APDS9930 lock semaphore
extern WIFI_State		*wifi_state;		///< WiFi state
extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore
extern const esp_app_desc_t	*app_desc;

extern uint32_t			Alarm;
extern int			batteryState;
extern float			batteryVolts;
extern float			batteryCurrent;
extern float			batteryPower;
extern float			solarVolts;
extern float			solarCurrent;
extern float			solarPower;

extern uint8_t			Relay1;
extern uint8_t			Relay2;
extern uint8_t			Dimmer3;
extern uint8_t			Dimmer4;


void request_mqtt(bool state)
{
    if (state) {
	if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) {
	    ESP_LOGW(TAG, "request_mqtt(true) but already connected");
	} else {
    	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
	}
    } else {
	if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_DISCONNECT) {
	    ESP_LOGW(TAG, "request_mqtt(false) already in progress");
	} else if (! (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)) {
	    ESP_LOGW(TAG, "request_mqtt(false) but already disconnected");
	} else {
    	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
	}
    }
}



bool ready_mqtt(void)
{
    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
        return true;
    return false;
}



void wait_mqtt(int time)
{
//    EventBits_t uxBits;

    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) {
	/*uxBits =*/ xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, time / portTICK_PERIOD_MS);
//	ESP_LOGI(TAG, "wait_mqtt(%d) 2 %lu", time, uxBits & TASK_MQTT_DISCONNECTED);
//    } else {
//	ESP_LOGI(TAG, "wait_mqtt(%d) 3 not connected", time);
    }
}



/**
 * @brief Generate the mqtt topic base part.
 * @return The topic string allocated in memory.
 */
char *topic_base(void)
{
    char        *tmp;

#ifdef CONFIG_CODE_PRODUCTION
    tmp = xstrcpy((char *)"balkon/");
#endif
#ifdef CONFIG_CODE_TESTING
    tmp = xstrcpy((char *)"wemos/");
#endif

    return tmp;
}



/**
 * @brief The mqtt generic publish function.
 * @param topic The topic of the mqtt message.
 * @param payload The payload of the mqtt message.
 */
void publisher(char *topic, char *payload)
{
    /*
     * First count, then sent the data.
     */
    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
        count_pub++;
        xSemaphoreGive(xSemaphorePcounter);
    } else {
        ESP_LOGE(TAG, "publisher() counter lock");
    }

    if (payload)
        esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
    else
	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
}



bool do_event_data(char *check, char *topic, char *data) {
    return false;
}


void publish(void)
{
    char        		*topic = NULL, *payload = NULL, buf[64];
    const esp_app_desc_t	*app_desc = esp_app_get_description();
    int				Quality = 0;

    // {"system":{"battery":40,"alarm":0,"version":"0.3.1","rssi":-77,"wifi":46,"light":{"lux":0.1,"gain":3,"agl":0}},"solar":{"voltage":0.31,"current":0,"power":0},"battery":{"voltage":12.27,"current":53.5,"power":0.657},"real":{"current":-53.5},"TH":{"temperature":8.88,"humidity":0},"output":{"relay1":0,"relay2":0,"dimmer3":90,"dimmer4":80}}
    payload = xstrcpy((char *)"{\"system\":{\"battery\":");
    sprintf(buf, "%d", batteryState);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"alarm\":");
    sprintf(buf, "%ld", Alarm);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"version\":\"");
    payload = xstrcat(payload, (char *)app_desc->version);
    payload = xstrcat(payload, (char *)"\",\"rssi\":");
    if (xSemaphoreTake(xSemaphoreWiFi, 25) == pdTRUE) {
	sprintf(buf, "%d", wifi_state->STA_rssi);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"wifi\":");
	if (wifi_state->STA_rssi <= -100) {
	    Quality = 0;
	} else if (wifi_state->STA_rssi >= -50) {
	    Quality = 100;
	} else {
	    Quality = 2 * (wifi_state->STA_rssi + 100);
	}
	sprintf(buf, "%d", Quality);
	payload = xstrcat(payload, buf);
	xSemaphoreGive(xSemaphoreWiFi);
    }
    payload = xstrcat(payload, (char *)",\"light\":{\"lux\":");
    if (xSemaphoreTake(xSemaphoreAPDS9930, 25) == pdTRUE) {
	sprintf(buf, "%.1f", apds9930_state->ambient_light);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"gain\":");
	sprintf(buf, "%d", apds9930_state->gain);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"agl\":");
	sprintf(buf, "%d", apds9930_state->aglbit);
	payload = xstrcat(payload, buf);
	xSemaphoreGive(xSemaphoreAPDS9930);
    }
    payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":");
    sprintf(buf, "%.2f", solarVolts);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"current\":");
    sprintf(buf, "%.1f", solarCurrent);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"power\":");
    sprintf(buf, "%.3f", solarPower / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":");
    sprintf(buf, "%.2f", batteryVolts);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"current\":");
    sprintf(buf, "%.1f", batteryCurrent);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"power\":");
    sprintf(buf, "%.3f", batteryPower / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"real\":{\"current\":");
    sprintf(buf, "%.1f", (0 - batteryCurrent) + solarCurrent);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":");
    if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) {
	sprintf(buf, "%.2f", bmp280_state->temperature);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"pressure\":");
	sprintf(buf, "%.1f", bmp280_state->pressure / 100.0);
	payload = xstrcat(payload, buf);
	xSemaphoreGive(xSemaphoreBMP280);
    }
    payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":");
    sprintf(buf, "%d", Relay1);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"relay2\":");
    sprintf(buf, "%d", Relay2);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"dimmer3\":");
    sprintf(buf, "%d", Dimmer3);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"dimmer4\":");
    sprintf(buf, "%d", Dimmer4);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"}}");
    topic = topic_base();
    topic = xstrcat(topic, (char *)"status");
    ESP_LOGI(TAG, "%s %s", topic, payload);
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
    char	*subscr = NULL, *check = NULL;

    switch (event->event_id) {

        case MQTT_EVENT_CONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
	    subscr = topic_base();
            subscr = xstrcat(subscr, (char *)"output/set/#");
	    int msgid = esp_mqtt_client_subscribe(client, subscr, 0);
            ESP_LOGD(TAG, "Subscribe `%s' id %d", subscr, msgid);
            free(subscr);
	    subscr = NULL;
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
            break;

        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
            break;

        case MQTT_EVENT_SUBSCRIBED:
            ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_UNSUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_PUBLISHED:
	    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
	    	if (count_pub) {
		    count_pub--;
	    	}
		xSemaphoreGive(xSemaphorePcounter);
	    } else {
        	ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event");
	    }
	    ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d, %d msgs in queue", event->msg_id, count_pub);
            break;

        case MQTT_EVENT_DATA:
	    char data[65], topic[128];
	    if (event->data_len < 65)
		snprintf(data, 64, "%.*s", event->data_len, event->data);
	    else
		data[0] = '\0';
	    if (event->topic_len < 128)
		snprintf(topic, 127, "%.*s", event->topic_len, event->topic);
	    else
		topic[0] = '\0';
	    ESP_LOGI(TAG, "MQTT_EVENT_DATA %s %s", topic, data);

	    check = topic_base();
            check = xstrcat(check, (char *)"output/set/1");
	    if (strncmp(check, event->topic, event->topic_len) == 0) {
		ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
		Relay1 = (uint8_t)atoi(data);
		nvsio_write_u8((char *)"out1", Relay1);
	    }

	    check[strlen(check)-1] = '2';
	    if (strncmp(check, event->topic, event->topic_len) == 0) {
                ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
                Relay2 = (uint8_t)atoi(data);
		nvsio_write_u8((char *)"out2", Relay2);
            }

	    check[strlen(check)-1] = '3';
            if (strncmp(check, event->topic, event->topic_len) == 0) {
                ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
                Dimmer3 = (uint8_t)atoi(data);
		nvsio_write_u8((char *)"out3", Dimmer3);
            }

	    check[strlen(check)-1] = '4';
            if (strncmp(check, event->topic, event->topic_len) == 0) {
                ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
                Dimmer4 = (uint8_t)atoi(data);
		nvsio_write_u8((char *)"out4", Dimmer4);
            }
            free(check);
            check = NULL;
            break;

        case MQTT_EVENT_ERROR:
            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
            break;

	case MQTT_EVENT_BEFORE_CONNECT:
	    ESP_LOGD(TAG, "MQTT_EVENT_BEFORE_CONNECT");
	    // Configure connection can be here.
	    break;

        default:
            ESP_LOGE(TAG, "Other event id:%d", event->event_id);
            break;
    }
    return ESP_OK;
}



static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
    mqtt_event_handler_cb(event_data);
}



/*
 * Task to read temperature sensors on request.
 */
void task_mqtt(void *pvParameter)
{
    esp_err_t	err;
    char	*uri = NULL, port[11];

    ESP_LOGI(TAG, "Starting MQTT task");
    esp_log_level_set("MQTT_CLIENT", ESP_LOG_ERROR);
    xSemaphorePcounter = xSemaphoreCreateMutex();

    /* event handler and event group for the wifi driver */
    xEventGroupMQTT = xEventGroupCreate();
    EventBits_t uxBits;
    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);

    esp_mqtt_client_config_t mqtt_cfg = {
        .broker.address.uri = "mqtt://localhost",
    };
    client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);

    /*
     * Task loop forever.
     */
    while (1) {

	uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );

	if (uxBits & TASK_MQTT_CONNECT) {
	    /*
	     * First build the connect uri.
	     */
	    uri = xstrcpy((char *)"mqtt://");
	    if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) {
		uri = xstrcat(uri, CONFIG_MQTT_USER);
		uri = xstrcat(uri, (char *)":");
		uri = xstrcat(uri, CONFIG_MQTT_PASS);
		uri = xstrcat(uri, (char *)"@");
	    }
	    uri = xstrcat(uri, CONFIG_MQTT_SERVER);
	    if (CONFIG_MQTT_PORT != 1883) {
		uri = xstrcat(uri, (char *)":");
		sprintf(port, "%d", CONFIG_MQTT_PORT);
		uri = xstrcat(uri, port);
	    }
	    ESP_LOGI(TAG, "Request MQTT connect %s", uri);

	    /*
	     * Connect to the broker.
	     */
	    err = esp_mqtt_client_set_uri(client, uri);
            if (err != ESP_OK)
                ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
	    err = esp_mqtt_client_start(client);
	    if (err != ESP_OK) {
	    	ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
	    }
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);

	} else if (uxBits & TASK_MQTT_DISCONNECT) {
	    ESP_LOGI(TAG, "Request MQTT disconnect");
	    /*
	     * Unsubscribe if connected
	     */
	    if (ready_mqtt()) {
	    	char *topic = topic_base();
            	topic = xstrcat(topic, (char *)"output/set/#");
            	esp_mqtt_client_unsubscribe(client, topic);
            	free(topic);
	    }

	    /*
	     * Disconnect from broker and wait until confirmed.
	     */
	    err = esp_mqtt_client_disconnect(client);
	    if (err != ESP_OK) {
                ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
	    }
	    xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, 500 / portTICK_PERIOD_MS);

	    /*
	     * Finally stop the client because new connections start
	     * with a 'esp_mqtt_client_start()' command.
	     * This will take about 5 seconds, but we don't need the network.
	     */
	    err = esp_mqtt_client_stop(client);
            if (err != ESP_OK) {
                ESP_LOGE(TAG, "esp_mqtt_client_stop() result %s", esp_err_to_name(err));
            }

	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
	    ESP_LOGI(TAG, "Request MQTT disconnect done");
	}
    }
}

mercurial