|
1 /** |
|
2 * @file task_mqtt.c |
|
3 * @brief The FreeRTOS task to maintain MQTT connections. |
|
4 */ |
|
5 |
|
6 |
|
7 #include "config.h" |
|
8 |
|
9 |
|
10 static const char *TAG = "task_mqtt"; |
|
11 |
|
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task |
|
13 SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. |
|
14 int count_pub = 0; ///< Outstanding published messages. |
|
15 esp_mqtt_client_handle_t client; ///< MQTT client handle |
|
16 |
|
17 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection |
|
18 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect |
|
19 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected |
|
20 |
|
21 const char *sensState[] = { "OK", "ERROR" }; ///< Sensor state strings |
|
22 const char *unitMode[] = { "OFF", "ON" }; ///< Units state strings |
|
23 |
|
24 extern BMP280_State *bmp280_state; ///< BMP280 state |
|
25 extern SemaphoreHandle_t xSemaphoreBMP280; ///< BMP280 lock semaphore |
|
26 extern INA219_State *ina219_state; ///< INA219 state |
|
27 extern SemaphoreHandle_t xSemaphoreINA219; ///< INA219 lock semaphore |
|
28 extern WIFI_State *wifi_state; ///< WiFi state |
|
29 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore |
|
30 extern const esp_app_desc_t *app_desc; |
|
31 |
|
32 extern uint32_t Alarm; |
|
33 extern float batteryState; |
|
34 extern float batteryVolts; |
|
35 extern float batteryCurrent; |
|
36 extern float batteryPower; |
|
37 extern float solarVolts; |
|
38 extern float solarCurrent; |
|
39 extern float solarPower; |
|
40 |
|
41 |
|
42 void connect_mqtt(bool state) |
|
43 { |
|
44 if (state) |
|
45 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT); |
|
46 else |
|
47 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); |
|
48 } |
|
49 |
|
50 |
|
51 |
|
52 bool ready_mqtt(void) |
|
53 { |
|
54 if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) |
|
55 return true; |
|
56 return false; |
|
57 } |
|
58 |
|
59 |
|
60 |
|
61 /** |
|
62 * @brief Generate the mqtt payload header. |
|
63 * @return Allocated character string with the header. |
|
64 */ |
|
65 char *payload_header(void) |
|
66 { |
|
67 char *tmp; |
|
68 |
|
69 tmp = xstrcpy((char *)"{\"metric\":"); |
|
70 return tmp; |
|
71 } |
|
72 |
|
73 |
|
74 |
|
75 /** |
|
76 * @brief Generate the mqtt topic base part. |
|
77 * @return The topic string allocated in memory. |
|
78 */ |
|
79 char *topic_base(void) |
|
80 { |
|
81 char *tmp; |
|
82 |
|
83 #ifdef CONFIG_CODE_PRODUCTION |
|
84 tmp = xstrcpy((char *)"balkon/"); |
|
85 #endif |
|
86 #ifdef CONFIG_CODE_TESTING |
|
87 tmp = xstrcpy((char *)"wemos/"); |
|
88 #endif |
|
89 |
|
90 return tmp; |
|
91 } |
|
92 |
|
93 |
|
94 |
|
95 /** |
|
96 * @brief The mqtt generic publish function. |
|
97 * @param topic The topic of the mqtt message. |
|
98 * @param payload The payload of the mqtt message. |
|
99 */ |
|
100 void publisher(char *topic, char *payload) |
|
101 { |
|
102 /* |
|
103 * First count, then sent the data. |
|
104 */ |
|
105 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { |
|
106 count_pub++; |
|
107 xSemaphoreGive(xSemaphorePcounter); |
|
108 } else { |
|
109 ESP_LOGE(TAG, "publisher() counter lock"); |
|
110 } |
|
111 |
|
112 if (payload) |
|
113 esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); |
|
114 else |
|
115 esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); |
|
116 } |
|
117 |
|
118 |
|
119 |
|
120 void publish(void) |
|
121 { |
|
122 char *topic = NULL, *payload = NULL, buf[64]; |
|
123 |
|
124 // {"system":{"battery":70,"alarm":0,"version":"0.2.6","rssi":-56,"wifi":88,"light":{"lux":12.34,"gain":2}},"solar":{"voltage":13.98,"current":234.1,"power":3.272718},"battery":{"voltage":13.21,"current":4.942289,"power":0.065288},"real":{"current":229.1577},"TH":{"temperature":20.2,"humidity":48.3},"output":{"relay1":0,"relay2":0,"dimmer3":0,"dimmer4":0}} |
|
125 // |
|
126 payload = payload_header(); |
|
127 payload = xstrcat(payload, (char *)"{\"system\":{\"battery\":"); |
|
128 sprintf(buf, "%.0f", batteryState); |
|
129 payload = xstrcat(payload, buf); |
|
130 payload = xstrcat(payload, (char *)",\"alarm\":"); |
|
131 sprintf(buf, "%ld", Alarm); |
|
132 payload = xstrcat(payload, buf); |
|
133 payload = xstrcat(payload, (char *)",\"version\":\""); |
|
134 payload = xstrcat(payload, (char *)app_desc->version); |
|
135 payload = xstrcat(payload, (char *)",\"rssi\":"); |
|
136 |
|
137 payload = xstrcat(payload, (char *)",\"light\":{\"lux\":"); |
|
138 |
|
139 payload = xstrcat(payload, (char *)",\"gain\":"); |
|
140 |
|
141 payload = xstrcat(payload, (char *)"}},\"solar\":{\"voltage\":"); |
|
142 sprintf(buf, "%.2f", solarVolts); |
|
143 payload = xstrcat(payload, buf); |
|
144 payload = xstrcat(payload, (char *)",\"current\":"); |
|
145 sprintf(buf, "%.1f", solarCurrent); |
|
146 payload = xstrcat(payload, buf); |
|
147 payload = xstrcat(payload, (char *)",\"power\":"); |
|
148 sprintf(buf, "%.3f", solarPower / 1000.0); |
|
149 payload = xstrcat(payload, buf); |
|
150 payload = xstrcat(payload, (char *)"},\"battery\":{\"voltage\":"); |
|
151 sprintf(buf, "%.2f", batteryVolts); |
|
152 payload = xstrcat(payload, buf); |
|
153 payload = xstrcat(payload, (char *)",\"current\":"); |
|
154 sprintf(buf, "%.1f", batteryCurrent); |
|
155 payload = xstrcat(payload, buf); |
|
156 payload = xstrcat(payload, (char *)",\"power\":"); |
|
157 sprintf(buf, "%.3f", batteryPower / 1000.0); |
|
158 payload = xstrcat(payload, buf); |
|
159 payload = xstrcat(payload, (char *)"},\"real\":{\"current\":"); |
|
160 payload = xstrcat(payload, (char *)"},\"TB\":{\"temperature\":"); |
|
161 if (xSemaphoreTake(xSemaphoreBMP280, 25) == pdTRUE) { |
|
162 sprintf(buf, "%.2f", bmp280_state->temperature); |
|
163 payload = xstrcat(payload, buf); |
|
164 payload = xstrcat(payload, (char *)",\"pressure\":"); |
|
165 sprintf(buf, "%.1f", bmp280_state->pressure / 100.0); |
|
166 payload = xstrcat(payload, buf); |
|
167 xSemaphoreGive(xSemaphoreBMP280); |
|
168 } |
|
169 payload = xstrcat(payload, (char *)"},\"output\":{\"relay1\":"); |
|
170 |
|
171 payload = xstrcat(payload, (char *)",\"relay2\":"); |
|
172 |
|
173 payload = xstrcat(payload, (char *)",\"dimmer3\":"); |
|
174 |
|
175 payload = xstrcat(payload, (char *)",\"dimmer4\":"); |
|
176 |
|
177 payload = xstrcat(payload, (char *)"}}"); |
|
178 topic = topic_base(); |
|
179 topic = xstrcat(topic, (char *)"status"); |
|
180 publisher(topic, payload); |
|
181 free(topic); |
|
182 topic = NULL; |
|
183 free(payload); |
|
184 payload = NULL; |
|
185 } |
|
186 |
|
187 |
|
188 |
|
189 static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) |
|
190 { |
|
191 switch (event->event_id) { |
|
192 |
|
193 case MQTT_EVENT_CONNECTED: |
|
194 ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); |
|
195 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
|
196 break; |
|
197 |
|
198 case MQTT_EVENT_DISCONNECTED: |
|
199 ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); |
|
200 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
|
201 break; |
|
202 |
|
203 case MQTT_EVENT_SUBSCRIBED: |
|
204 ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); |
|
205 break; |
|
206 |
|
207 case MQTT_EVENT_UNSUBSCRIBED: |
|
208 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); |
|
209 break; |
|
210 |
|
211 case MQTT_EVENT_PUBLISHED: |
|
212 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); |
|
213 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { |
|
214 if (count_pub) { |
|
215 count_pub--; |
|
216 } |
|
217 xSemaphoreGive(xSemaphorePcounter); |
|
218 } else { |
|
219 ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event"); |
|
220 } |
|
221 break; |
|
222 |
|
223 case MQTT_EVENT_DATA: |
|
224 ESP_LOGI(TAG, "MQTT_EVENT_DATA"); |
|
225 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); |
|
226 printf("DATA=%.*s\r\n", event->data_len, event->data); |
|
227 break; |
|
228 |
|
229 case MQTT_EVENT_ERROR: |
|
230 ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); |
|
231 break; |
|
232 |
|
233 case MQTT_EVENT_BEFORE_CONNECT: |
|
234 ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT"); |
|
235 // Configure connection can be here. |
|
236 break; |
|
237 |
|
238 default: |
|
239 ESP_LOGI(TAG, "Other event id:%d", event->event_id); |
|
240 break; |
|
241 } |
|
242 return ESP_OK; |
|
243 } |
|
244 |
|
245 |
|
246 |
|
247 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { |
|
248 mqtt_event_handler_cb(event_data); |
|
249 } |
|
250 |
|
251 |
|
252 |
|
253 /* |
|
254 * Task to read temperature sensors on request. |
|
255 */ |
|
256 void task_mqtt(void *pvParameter) |
|
257 { |
|
258 esp_err_t err; |
|
259 char *uri = NULL, port[11]; |
|
260 |
|
261 ESP_LOGI(TAG, "Starting MQTT task"); |
|
262 xSemaphorePcounter = xSemaphoreCreateMutex(); |
|
263 |
|
264 /* event handler and event group for the wifi driver */ |
|
265 xEventGroupMQTT = xEventGroupCreate(); |
|
266 EventBits_t uxBits; |
|
267 |
|
268 esp_mqtt_client_config_t mqtt_cfg = { |
|
269 .broker.address.uri = "mqtt://localhost", |
|
270 }; |
|
271 client = esp_mqtt_client_init(&mqtt_cfg); |
|
272 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); |
|
273 |
|
274 /* |
|
275 * Task loop forever. |
|
276 */ |
|
277 while (1) { |
|
278 |
|
279 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); |
|
280 |
|
281 if (uxBits & TASK_MQTT_CONNECT) { |
|
282 uri = xstrcpy((char *)"mqtt://"); |
|
283 if (strlen(CONFIG_MQTT_USER) && strlen(CONFIG_MQTT_PASS)) { |
|
284 uri = xstrcat(uri, CONFIG_MQTT_USER); |
|
285 uri = xstrcat(uri, (char *)":"); |
|
286 uri = xstrcat(uri, CONFIG_MQTT_PASS); |
|
287 uri = xstrcat(uri, (char *)"@"); |
|
288 } |
|
289 uri = xstrcat(uri, CONFIG_MQTT_SERVER); |
|
290 if (CONFIG_MQTT_PORT != 1883) { |
|
291 uri = xstrcat(uri, (char *)":"); |
|
292 sprintf(port, "%d", CONFIG_MQTT_PORT); |
|
293 uri = xstrcat(uri, port); |
|
294 } |
|
295 |
|
296 ESP_LOGI(TAG, "Request MQTT connect %s", uri); |
|
297 err = esp_mqtt_client_set_uri(client, uri); |
|
298 if (err != ESP_OK) |
|
299 ESP_LOGE(TAG, "Set uri %s", esp_err_to_name(err)); |
|
300 err = esp_mqtt_client_start(client); |
|
301 if (err != ESP_OK) |
|
302 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); |
|
303 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); |
|
304 |
|
305 } else if (uxBits & TASK_MQTT_DISCONNECT) { |
|
306 ESP_LOGI(TAG, "Request MQTT disconnect"); |
|
307 esp_mqtt_client_stop(client); |
|
308 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); |
|
309 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
|
310 } |
|
311 } |
|
312 } |
|
313 |