main/task_mqtt.c

changeset 5
b1f38105ca7e
child 6
bad3414f7bc4
equal deleted inserted replaced
4:d0155c16e992 5:b1f38105ca7e
1 /**
2 * @file task_mqtt.c
3 * @brief The FreeRTOS task to maintain MQTT connections.
4 */
5
6
7 #include "config.h"
8
9
10 static const char *TAG = "task_mqtt";
11
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task
13 SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore.
14 int count_pub = 0; ///< Outstanding published messages.
15 esp_mqtt_client_handle_t client; ///< MQTT client handle
16
17 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection
18 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect
19 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected
20
21 const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings
22 const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings
23
24 extern BMP280_State *bmp280_state; ///< BMP280 state
25 extern SemaphoreHandle_t xSemaphoreBMP280; ///< BMP280 lock semaphore
26 extern INA219_State *ina219_state; ///< INA219 state
27 extern SemaphoreHandle_t xSemaphoreINA219; ///< INA219 lock semaphore
28 extern WIFI_State *wifi_state; ///< WiFi state
29 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore
30 extern const esp_app_desc_t *app_desc;
31
32 extern uint32_t Alarm;
33 extern float batteryState;
34 extern float batteryVolts;
35 extern float batteryCurrent;
36 extern float batteryPower;
37 extern float solarVolts;
38 extern float solarCurrent;
39 extern float solarPower;
40
41
42 void connect_mqtt(bool state)
43 {
44 if (state)
45 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
46 else
47 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
48 }
49
50
51
52 bool ready_mqtt(void)
53 {
54 if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
55 return true;
56 return false;
57 }
58
59
60
61 /**
62 * @brief Generate the mqtt payload header.
63 * @return Allocated character string with the header.
64 */
65 char *payload_header(void)
66 {
67 char *tmp;
68
69 tmp = xstrcpy((char *)"{\"metric\":");
70 return tmp;
71 }
72
73
74
75 /**
76 * @brief Generate the mqtt topic base part.
77 * @return The topic string allocated in memory.
78 */
79 char *topic_base(void)
80 {
81 char *tmp;
82
83 #ifdef CONFIG_CODE_PRODUCTION
84 tmp = xstrcpy((char *)"balkon/");
85 #endif
86 #ifdef CONFIG_CODE_TESTING
87 tmp = xstrcpy((char *)"wemos/");
88 #endif
89
90 return tmp;
91 }
92
93
94
95 /**
96 * @brief The mqtt generic publish function.
97 * @param topic The topic of the mqtt message.
98 * @param payload The payload of the mqtt message.
99 */
100 void publisher(char *topic, char *payload)
101 {
102 /*
103 * First count, then sent the data.
104 */
105 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
106 count_pub++;
107 xSemaphoreGive(xSemaphorePcounter);
108 } else {
109 ESP_LOGE(TAG, "publisher() counter lock");
110 }
111
112 if (payload)
113 esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
114 else
115 esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
116 }
117
118
119
120 void publish(void)
121 {
122 char *topic = NULL, *payload = NULL, buf[64];
123
124 // {"system":{"battery":70,"alarm":0,"version":"0.2.6","rssi":-56,"wifi":88,"light":{"lux":12.34,"gain":2}},"solar":{"voltage":13.98,"current":234.1,"power":3.272718},"battery":{"voltage":13.21,"current":4.942289,"power":0.065288},"real":{"current":229.1577},"TH":{"temperature":20.2,"humidity":48.3},"output":{"relay1":0,"relay2":0,"dimmer3":0,"dimmer4":0}}
125 //
126 payload = payload_header();
127 payload = xstrcat(payload, (char *)"{\"system\":{\"battery\":");
128 sprintf(buf, "%.0f", batteryState);
129 payload = xstrcat(payload, buf);
130 payload = xstrcat(payload, (char *)",\"alarm\":");
131 sprintf(buf, "%ld", Alarm);
132 payload = xstrcat(payload, buf);
133 payload = xstrcat(payload, (char *)",\"version\":\"");
134 payload = xstrcat(payload, (char *)app_desc->version);
135 payload = xstrcat(payload, (char *)",\"rssi\":");
136
137 payload = xstrcat(payload, (char *)",\"light\":{\"lux\":");
138
139 payload = xstrcat(payload, (char *)",\"gain\":");
140
141 payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":");
142 sprintf(buf, "%.2f", solarVolts);
143 payload = xstrcat(payload, buf);
144 payload = xstrcat(payload, (char *)",\"current\":");
145 sprintf(buf, "%.1f", solarCurrent);
146 payload = xstrcat(payload, buf);
147 payload = xstrcat(payload, (char *)",\"power\":");
148 sprintf(buf, "%.3f", solarPower / 1000.0);
149 payload = xstrcat(payload, buf);
150 payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":");
151 sprintf(buf, "%.2f", batteryVolts);
152 payload = xstrcat(payload, buf);
153 payload = xstrcat(payload, (char *)",\"current\":");
154 sprintf(buf, "%.1f", batteryCurrent);
155 payload = xstrcat(payload, buf);
156 payload = xstrcat(payload, (char *)",\"power\":");
157 sprintf(buf, "%.3f", batteryPower / 1000.0);
158 payload = xstrcat(payload, buf);
159 payload = xstrcat(payload, (char *)"},\"real\":{\"current\":");
160 payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":");
161 if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) {
162 sprintf(buf, "%.2f", bmp280_state->temperature);
163 payload = xstrcat(payload, buf);
164 payload = xstrcat(payload, (char *)",\"pressure\":");
165 sprintf(buf, "%.1f", bmp280_state->pressure / 100.0);
166 payload = xstrcat(payload, buf);
167 xSemaphoreGive(xSemaphoreBMP280);
168 }
169 payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":");
170
171 payload = xstrcat(payload, (char *)",\"relay2\":");
172
173 payload = xstrcat(payload, (char *)",\"dimmer3\":");
174
175 payload = xstrcat(payload, (char *)",\"dimmer4\":");
176
177 payload = xstrcat(payload, (char *)"}}");
178 topic = topic_base();
179 topic = xstrcat(topic, (char *)"status");
180 publisher(topic, payload);
181 free(topic);
182 topic = NULL;
183 free(payload);
184 payload = NULL;
185 }
186
187
188
189 static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
190 {
191 switch (event->event_id) {
192
193 case MQTT_EVENT_CONNECTED:
194 ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
195 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
196 break;
197
198 case MQTT_EVENT_DISCONNECTED:
199 ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
200 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
201 break;
202
203 case MQTT_EVENT_SUBSCRIBED:
204 ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
205 break;
206
207 case MQTT_EVENT_UNSUBSCRIBED:
208 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
209 break;
210
211 case MQTT_EVENT_PUBLISHED:
212 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
213 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
214 if (count_pub) {
215 count_pub--;
216 }
217 xSemaphoreGive(xSemaphorePcounter);
218 } else {
219 ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event");
220 }
221 break;
222
223 case MQTT_EVENT_DATA:
224 ESP_LOGI(TAG, "MQTT_EVENT_DATA");
225 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
226 printf("DATA=%.*s\r\n", event->data_len, event->data);
227 break;
228
229 case MQTT_EVENT_ERROR:
230 ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
231 break;
232
233 case MQTT_EVENT_BEFORE_CONNECT:
234 ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
235 // Configure connection can be here.
236 break;
237
238 default:
239 ESP_LOGI(TAG, "Other event id:%d", event->event_id);
240 break;
241 }
242 return ESP_OK;
243 }
244
245
246
247 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
248 mqtt_event_handler_cb(event_data);
249 }
250
251
252
253 /*
254 * Task to read temperature sensors on request.
255 */
256 void task_mqtt(void *pvParameter)
257 {
258 esp_err_t err;
259 char *uri = NULL, port[11];
260
261 ESP_LOGI(TAG, "Starting MQTT task");
262 xSemaphorePcounter = xSemaphoreCreateMutex();
263
264 /* event handler and event group for the wifi driver */
265 xEventGroupMQTT = xEventGroupCreate();
266 EventBits_t uxBits;
267
268 esp_mqtt_client_config_t mqtt_cfg = {
269 .broker.address.uri = "mqtt://localhost",
270 };
271 client = esp_mqtt_client_init(&mqtt_cfg);
272 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
273
274 /*
275 * Task loop forever.
276 */
277 while (1) {
278
279 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );
280
281 if (uxBits & TASK_MQTT_CONNECT) {
282 uri = xstrcpy((char *)"mqtt://");
283 if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) {
284 uri = xstrcat(uri, CONFIG_MQTT_USER);
285 uri = xstrcat(uri, (char *)":");
286 uri = xstrcat(uri, CONFIG_MQTT_PASS);
287 uri = xstrcat(uri, (char *)"@");
288 }
289 uri = xstrcat(uri, CONFIG_MQTT_SERVER);
290 if (CONFIG_MQTT_PORT != 1883) {
291 uri = xstrcat(uri, (char *)":");
292 sprintf(port, "%d", CONFIG_MQTT_PORT);
293 uri = xstrcat(uri, port);
294 }
295
296 ESP_LOGI(TAG, "Request MQTT connect %s", uri);
297 err = esp_mqtt_client_set_uri(client, uri);
298 if (err != ESP_OK)
299 ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err));
300 err = esp_mqtt_client_start(client);
301 if (err != ESP_OK)
302 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
303 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
304
305 } else if (uxBits & TASK_MQTT_DISCONNECT) {
306 ESP_LOGI(TAG, "Request MQTT disconnect");
307 esp_mqtt_client_stop(client);
308 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
309 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
310 }
311 }
312 }
313

mercurial