main/task_mqtt.c

Sun, 10 Sep 2023 17:29:15 +0200

author
Michiel Broek <mbroek@mbse.eu>
date
Sun, 10 Sep 2023 17:29:15 +0200
changeset 37
50dbb626fbab
parent 32
84e54b14e7db
permissions
-rw-r--r--

Version 0.4.3. Attempt to fix the sunlight overflow of the APDS9930 sensor in the private part of the esp-idf-lib. Removed some error checks from functions that always return OK. Store light sensor registers in the state record and report the values in the json result string.

/**
 * @file task_mqtt.c
 * @brief The FreeRTOS task to maintain MQTT connections.
 */


#include "config.h"


static const char		*TAG = "task_mqtt";

EventGroupHandle_t		xEventGroupMQTT;	///< Events MQTT task
SemaphoreHandle_t		xSemaphorePcounter;	///< Publish counter semaphore.
int				count_pub = 0;		///< Outstanding published messages.
esp_mqtt_client_handle_t	client;			///< MQTT client handle

const int TASK_MQTT_CONNECT = BIT0;			///< Request MQTT connection
const int TASK_MQTT_DISCONNECT = BIT1;			///< Request MQTT disconnect
const int TASK_MQTT_CONNECTED = BIT2;			///< MQTT is connected
const int TASK_MQTT_DISCONNECTED = BIT3;

const char			*sensState[] = { "OK", "ERROR" };	///< Sensor state strings
const char			*unitMode[] = { "OFF", "ON" };		///< Units state strings

extern TEMP_State		*temp_state;		///< Internal temperature state
extern SemaphoreHandle_t	xSemaphoreTEMP;		///< Internal temperature semaphore
extern BMP280_State		*bmp280_state;		///< BMP280 state
extern SemaphoreHandle_t        xSemaphoreBMP280;	///< BMP280 lock semaphore
extern INA219_State		*ina219_state;		///< INA219 state
extern SemaphoreHandle_t	xSemaphoreINA219;	///< INA219 lock semaphore
extern APDS9930_State		*apds9930_state;	///< APDS9930 state
extern SemaphoreHandle_t	xSemaphoreAPDS9930;	///< APDS9930 lock semaphore
extern WIFI_State		*wifi_state;		///< WiFi state
extern SemaphoreHandle_t	xSemaphoreWiFi;		///< WiFi lock semaphore
extern const esp_app_desc_t	*app_desc;

extern uint32_t			Alarm;
extern int			batteryState;
extern float			batteryVolts;
extern float			batteryCurrent;
extern float			batteryPower;
extern float			solarVolts;
extern float			solarCurrent;
extern float			solarPower;

extern uint8_t			Relay1;
extern uint8_t			Relay2;
extern uint8_t			Dimmer3;
extern uint8_t			Dimmer4;


void request_mqtt(bool state)
{
    if (state) {
	if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) {
	    ESP_LOGW(TAG, "request_mqtt(true) but already connected");
	} else {
    	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
	}
    } else {
	if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_DISCONNECT) {
	    ESP_LOGW(TAG, "request_mqtt(false) already in progress");
	} else if (! (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)) {
	    ESP_LOGW(TAG, "request_mqtt(false) but already disconnected");
	} else {
    	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
	}
    }
}



bool ready_mqtt(void)
{
    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
        return true;
    return false;
}



void wait_mqtt(int time)
{
//    EventBits_t uxBits;

    if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) {
	/*uxBits =*/ xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, time / portTICK_PERIOD_MS);
//	ESP_LOGI(TAG, "wait_mqtt(%d) 2 %lu", time, uxBits & TASK_MQTT_DISCONNECTED);
//    } else {
//	ESP_LOGI(TAG, "wait_mqtt(%d) 3 not connected", time);
    }
}



/**
 * @brief Generate the mqtt topic base part.
 * @return The topic string allocated in memory.
 */
char *topic_base(void)
{
    char        *tmp;

#ifdef CONFIG_CODE_PRODUCTION
    tmp = xstrcpy((char *)"balkon/");
#endif
#ifdef CONFIG_CODE_TESTING
    tmp = xstrcpy((char *)"wemos/");
#endif

    return tmp;
}



/**
 * @brief The mqtt generic publish function.
 * @param topic The topic of the mqtt message.
 * @param payload The payload of the mqtt message.
 */
void publisher(char *topic, char *payload)
{
    /*
     * First count, then sent the data.
     */
    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
        count_pub++;
        xSemaphoreGive(xSemaphorePcounter);
    } else {
        ESP_LOGE(TAG, "publisher() counter lock");
    }

    if (payload)
        esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
    else
	esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
}



bool do_event_data(char *check, char *topic, char *data) {
    return false;
}


void publish(void)
{
    char        		*topic = NULL, *payload = NULL, buf[64];
    const esp_app_desc_t	*app_desc = esp_app_get_description();
    int				Quality = 0;

    // {"system":{"battery":40,"alarm":0,"version":"0.3.1","rssi":-77,"wifi":46,"light":{"lux":0.1,"gain":3,"agl":0}},"solar":{"voltage":0.31,"current":0,"power":0},"battery":{"voltage":12.27,"current":53.5,"power":0.657},"real":{"current":-53.5},"TH":{"temperature":8.88,"humidity":0},"output":{"relay1":0,"relay2":0,"dimmer3":90,"dimmer4":80}}
    payload = xstrcpy((char *)"{\"system\":{\"battery\":");
    sprintf(buf, "%d", batteryState);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"alarm\":");
    sprintf(buf, "%ld", Alarm);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"version\":\"");
    payload = xstrcat(payload, (char *)app_desc->version);
    payload = xstrcat(payload, (char *)"\",\"rssi\":");
    if (xSemaphoreTake(xSemaphoreWiFi, 25) == pdTRUE) {
	sprintf(buf, "%d", wifi_state->STA_rssi);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"wifi\":");
	if (wifi_state->STA_rssi <= -100) {
	    Quality = 0;
	} else if (wifi_state->STA_rssi >= -50) {
	    Quality = 100;
	} else {
	    Quality = 2 * (wifi_state->STA_rssi + 100);
	}
	sprintf(buf, "%d", Quality);
	payload = xstrcat(payload, buf);
	xSemaphoreGive(xSemaphoreWiFi);
    }
    if (xSemaphoreTake(xSemaphoreTEMP, 25) == pdTRUE) {
	payload = xstrcat(payload, (char *)",\"temperature\":");
	sprintf(buf, "%.1f", temp_state->temperature);
	payload = xstrcat(payload, buf);
	xSemaphoreGive(xSemaphoreTEMP);
    }
    payload = xstrcat(payload, (char *)",\"light\":{\"lux\":");
    if (xSemaphoreTake(xSemaphoreAPDS9930, 25) == pdTRUE) {
	sprintf(buf, "%.1f", apds9930_state->ambient_light);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"gain\":");
	sprintf(buf, "%d", apds9930_state->gain);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"agl\":");
	sprintf(buf, "%d", apds9930_state->aglbit);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"ch0\":");
        sprintf(buf, "%d", apds9930_state->ch0);
        payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"ch1\":");
        sprintf(buf, "%d", apds9930_state->ch1);
        payload = xstrcat(payload, buf);
	xSemaphoreGive(xSemaphoreAPDS9930);
    }
    payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":");
    sprintf(buf, "%.2f", solarVolts);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"current\":");
    sprintf(buf, "%.1f", solarCurrent);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"power\":");
    sprintf(buf, "%.3f", solarPower / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":");
    sprintf(buf, "%.2f", batteryVolts);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"current\":");
    sprintf(buf, "%.1f", batteryCurrent);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"power\":");
    sprintf(buf, "%.3f", batteryPower / 1000.0);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"real\":{\"current\":");
    sprintf(buf, "%.1f", (0 - batteryCurrent) + solarCurrent);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":");
    if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) {
	sprintf(buf, "%.2f", bmp280_state->temperature);
	payload = xstrcat(payload, buf);
	payload = xstrcat(payload, (char *)",\"pressure\":");
	sprintf(buf, "%.1f", bmp280_state->pressure / 100.0);
	payload = xstrcat(payload, buf);
	xSemaphoreGive(xSemaphoreBMP280);
    }
    payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":");
    sprintf(buf, "%d", Relay1);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"relay2\":");
    sprintf(buf, "%d", Relay2);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"dimmer3\":");
    sprintf(buf, "%d", Dimmer3);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)",\"dimmer4\":");
    sprintf(buf, "%d", Dimmer4);
    payload = xstrcat(payload, buf);
    payload = xstrcat(payload, (char *)"}}");
    topic = topic_base();
    topic = xstrcat(topic, (char *)"status");
    ESP_LOGI(TAG, "%s %s", topic, payload);
    publisher(topic, payload);
    free(topic);
    topic = NULL;
    free(payload);
    payload = NULL;
}



static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
    char	*subscr = NULL, *check = NULL;

    switch (event->event_id) {

        case MQTT_EVENT_CONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
	    subscr = topic_base();
            subscr = xstrcat(subscr, (char *)"output/set/#");
	    int msgid = esp_mqtt_client_subscribe(client, subscr, 0);
            ESP_LOGD(TAG, "Subscribe `%s' id %d", subscr, msgid);
            free(subscr);
	    subscr = NULL;
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
            break;

        case MQTT_EVENT_DISCONNECTED:
            ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
            break;

        case MQTT_EVENT_SUBSCRIBED:
            ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_UNSUBSCRIBED:
            ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
            break;

        case MQTT_EVENT_PUBLISHED:
	    if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
	    	if (count_pub) {
		    count_pub--;
	    	}
		xSemaphoreGive(xSemaphorePcounter);
	    } else {
        	ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event");
	    }
	    ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d, %d msgs in queue", event->msg_id, count_pub);
            break;

        case MQTT_EVENT_DATA:
	    char data[65], topic[128];
	    if (event->data_len < 65)
		snprintf(data, 64, "%.*s", event->data_len, event->data);
	    else
		data[0] = '\0';
	    if (event->topic_len < 128)
		snprintf(topic, 127, "%.*s", event->topic_len, event->topic);
	    else
		topic[0] = '\0';
	    ESP_LOGI(TAG, "MQTT_EVENT_DATA %s %s", topic, data);

	    check = topic_base();
            check = xstrcat(check, (char *)"output/set/1");
	    if (strncmp(check, event->topic, event->topic_len) == 0) {
		ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
		Relay1 = (uint8_t)atoi(data);
		nvsio_write_u8((char *)"out1", Relay1);
	    }

	    check[strlen(check)-1] = '2';
	    if (strncmp(check, event->topic, event->topic_len) == 0) {
                ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
                Relay2 = (uint8_t)atoi(data);
		nvsio_write_u8((char *)"out2", Relay2);
            }

	    check[strlen(check)-1] = '3';
            if (strncmp(check, event->topic, event->topic_len) == 0) {
                ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
                Dimmer3 = (uint8_t)atoi(data);
		nvsio_write_u8((char *)"out3", Dimmer3);
            }

	    check[strlen(check)-1] = '4';
            if (strncmp(check, event->topic, event->topic_len) == 0) {
                ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
                Dimmer4 = (uint8_t)atoi(data);
		nvsio_write_u8((char *)"out4", Dimmer4);
            }
            free(check);
            check = NULL;
            break;

        case MQTT_EVENT_ERROR:
            ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
            break;

	case MQTT_EVENT_BEFORE_CONNECT:
	    ESP_LOGD(TAG, "MQTT_EVENT_BEFORE_CONNECT");
	    // Configure connection can be here.
	    break;

        default:
            ESP_LOGE(TAG, "Other event id:%d", event->event_id);
            break;
    }
    return ESP_OK;
}



static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
    mqtt_event_handler_cb(event_data);
}



/*
 * Task to read temperature sensors on request.
 */
void task_mqtt(void *pvParameter)
{
    esp_err_t	err;
    char	*uri = NULL, port[11];

    ESP_LOGI(TAG, "Starting MQTT task");
    esp_log_level_set("MQTT_CLIENT", ESP_LOG_ERROR);
    xSemaphorePcounter = xSemaphoreCreateMutex();

    /* event handler and event group for the wifi driver */
    xEventGroupMQTT = xEventGroupCreate();
    EventBits_t uxBits;
    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);

    esp_mqtt_client_config_t mqtt_cfg = {
        .broker.address.uri = "mqtt://localhost",
    };
    client = esp_mqtt_client_init(&mqtt_cfg);
    esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);

    /*
     * Task loop forever.
     */
    while (1) {

	uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );

	if (uxBits & TASK_MQTT_CONNECT) {
	    /*
	     * First build the connect uri.
	     */
	    uri = xstrcpy((char *)"mqtt://");
	    if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) {
		uri = xstrcat(uri, CONFIG_MQTT_USER);
		uri = xstrcat(uri, (char *)":");
		uri = xstrcat(uri, CONFIG_MQTT_PASS);
		uri = xstrcat(uri, (char *)"@");
	    }
	    uri = xstrcat(uri, CONFIG_MQTT_SERVER);
	    if (CONFIG_MQTT_PORT != 1883) {
		uri = xstrcat(uri, (char *)":");
		sprintf(port, "%d", CONFIG_MQTT_PORT);
		uri = xstrcat(uri, port);
	    }
	    ESP_LOGI(TAG, "Request MQTT connect %s", uri);

	    /*
	     * Connect to the broker.
	     */
	    err = esp_mqtt_client_set_uri(client, uri);
            if (err != ESP_OK)
                ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
	    err = esp_mqtt_client_start(client);
	    if (err != ESP_OK) {
	    	ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
	    }
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);

	} else if (uxBits & TASK_MQTT_DISCONNECT) {
	    ESP_LOGI(TAG, "Request MQTT disconnect");
	    /*
	     * Unsubscribe if connected
	     */
	    if (ready_mqtt()) {
	    	char *topic = topic_base();
            	topic = xstrcat(topic, (char *)"output/set/#");
            	esp_mqtt_client_unsubscribe(client, topic);
            	free(topic);
	    }

	    /*
	     * Disconnect from broker and wait until confirmed.
	     */
	    err = esp_mqtt_client_disconnect(client);
	    if (err != ESP_OK) {
                ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
	    }
	    xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, 500 / portTICK_PERIOD_MS);

	    /*
	     * Finally stop the client because new connections start
	     * with a 'esp_mqtt_client_start()' command.
	     * This will take about 5 seconds, but we don't need the network.
	     */
	    err = esp_mqtt_client_stop(client);
            if (err != ESP_OK) {
                ESP_LOGE(TAG, "esp_mqtt_client_stop() result %s", esp_err_to_name(err));
            }

	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
	    xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
	    xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
	    ESP_LOGI(TAG, "Request MQTT disconnect done");
	}
    }
}

mercurial