|
1 /** |
|
2 * @file task_mqtt.c |
|
3 * @brief The FreeRTOS task to maintain MQTT connections. |
|
4 */ |
|
5 |
|
6 |
|
7 #include "config.h" |
|
8 |
|
9 |
|
10 static const char *TAG = "task_mqtt"; |
|
11 |
|
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task |
|
13 |
|
14 uint64_t Sequence = 0; ///< Sequence stored in NVS |
|
15 nvs_handle_t seq_handle; ///< NVS handle |
|
16 |
|
17 esp_mqtt_client_handle_t client; ///< MQTT client handle |
|
18 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection |
|
19 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect |
|
20 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected |
|
21 |
|
22 const char *sensState[] = { "Ok", "Error" }; |
|
23 const char *unitMode[] = { "Off", "On" }; |
|
24 |
|
25 extern DS18B20_State *ds18b20_state; ///< DS18B20 state |
|
26 extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore |
|
27 extern ADC_State *adc_state; ///< ADC state |
|
28 extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore |
|
29 extern WIFI_State *wifi_state; ///< WiFi state |
|
30 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore |
|
31 |
|
32 extern unit_t units[3]; |
|
33 extern const esp_app_desc_t *app_desc; |
|
34 |
|
35 |
|
36 |
|
37 void connect_mqtt(bool state) |
|
38 { |
|
39 if (state) |
|
40 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT); |
|
41 else |
|
42 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); |
|
43 } |
|
44 |
|
45 |
|
46 |
|
47 bool ready_mqtt(void) |
|
48 { |
|
49 if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) |
|
50 return true; |
|
51 return false; |
|
52 } |
|
53 |
|
54 |
|
55 |
|
56 char *payload_header(void) |
|
57 { |
|
58 char *tmp, buf[128]; |
|
59 esp_err_t err; |
|
60 |
|
61 tmp = xstrcpy((char *)"{\"seq\":"); |
|
62 sprintf(buf, "%lld", Sequence++); |
|
63 err = nvs_set_u64(seq_handle, "Sequence_cnt", Sequence); |
|
64 if (err != ESP_OK) |
|
65 ESP_LOGE(TAG, "Error %s write Sequence to NVS", esp_err_to_name(err)); |
|
66 tmp = xstrcat(tmp, buf); |
|
67 tmp = xstrcat(tmp, (char *)",\"metric\":"); |
|
68 return tmp; |
|
69 } |
|
70 |
|
71 |
|
72 |
|
73 char *topic_base(char *msgtype) |
|
74 { |
|
75 char *tmp; |
|
76 |
|
77 tmp = xstrcpy((char *)"mbv1.0/pressure/"); |
|
78 tmp = xstrcat(tmp, msgtype); |
|
79 tmp = xstrcat(tmp, (char *)"/"); |
|
80 tmp = xstrcat(tmp, config.hostname); |
|
81 return tmp; |
|
82 } |
|
83 |
|
84 |
|
85 |
|
86 void publisher(char *topic, char *payload) |
|
87 { |
|
88 // publish the data |
|
89 if (payload) |
|
90 esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); |
|
91 else |
|
92 esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); |
|
93 } |
|
94 |
|
95 |
|
96 |
|
97 char *unit_data(int i) |
|
98 { |
|
99 char *payload = NULL; |
|
100 char buf[128]; |
|
101 |
|
102 payload = xstrcpy((char *)"{\"uuid\":\""); |
|
103 payload = xstrcat(payload, units[i].uuid); |
|
104 payload = xstrcat(payload, (char *)"\",\"alias\":\""); |
|
105 payload = xstrcat(payload, units[i].alias); |
|
106 |
|
107 // temperature_state temperature_address temperature |
|
108 payload = xstrcat(payload, (char *)"\",\"temperature\":{\"state\":\""); |
|
109 payload = xstrcat(payload, (char *)sensState[units[i].temperature_state]); |
|
110 payload = xstrcat(payload, (char *)"\",\"address\":\""); |
|
111 payload = xstrcat(payload, (char *)units[i].temperature_address); |
|
112 payload = xstrcat(payload, (char *)"\",\"temperature\":"); |
|
113 sprintf(buf, "%.3f", units[i].temperature / 1000.0); |
|
114 payload = xstrcat(payload, buf); |
|
115 |
|
116 // pressure_state pressure_channel pressure_voltage pressure_zero pressure |
|
117 payload = xstrcat(payload, (char *)"},\"pressure\":{\"state\":\""); |
|
118 payload = xstrcat(payload, (char *)sensState[units[i].pressure_state]); |
|
119 payload = xstrcat(payload, (char *)"\",\"channel\":"); |
|
120 sprintf(buf, "%d", units[i].pressure_channel); |
|
121 payload = xstrcat(payload, buf); |
|
122 payload = xstrcat(payload, (char *)",\"voltage\":"); |
|
123 sprintf(buf, "%.3f", units[i].pressure_voltage / 1000.0); |
|
124 payload = xstrcat(payload, buf); |
|
125 payload = xstrcat(payload, (char *)",\"zero\":"); |
|
126 sprintf(buf, "%.3f", units[i].pressure_zero / 1000.0); |
|
127 payload = xstrcat(payload, buf); |
|
128 payload = xstrcat(payload, (char *)",\"bar\":"); |
|
129 sprintf(buf, "%.3f", units[i].pressure / 1000.0); |
|
130 payload = xstrcat(payload, buf); |
|
131 payload = xstrcat(payload, (char *)"},\"mode\":\""); |
|
132 payload = xstrcat(payload, (char *)unitMode[units[i].mode]); |
|
133 payload = xstrcat(payload, (char *)"\"}"); |
|
134 return payload; |
|
135 } |
|
136 |
|
137 |
|
138 |
|
139 void publishUnits(void) |
|
140 { |
|
141 char *topic = NULL, *payload = NULL, *payloadu = NULL; |
|
142 int i; |
|
143 bool comma = false; |
|
144 |
|
145 payload = payload_header(); |
|
146 payload = xstrcat(payload, (char *)"{\"units\":["); |
|
147 for (i = 0; i < 3; i++) { |
|
148 if (comma) |
|
149 payload = xstrcat(payload, (char *)","); |
|
150 payloadu = unit_data(i); |
|
151 payload = xstrcat(payload, payloadu); |
|
152 comma = true; |
|
153 free(payloadu); |
|
154 payloadu = NULL; |
|
155 } |
|
156 payload = xstrcat(payload, (char *)"]}}"); |
|
157 topic = topic_base((char *)"DBIRTH"); |
|
158 publisher(topic, payload); |
|
159 free(topic); |
|
160 topic = NULL; |
|
161 free(payload); |
|
162 payload = NULL; |
|
163 } |
|
164 |
|
165 |
|
166 |
|
167 void publishNode(void) |
|
168 { |
|
169 char *topic = NULL, *payload = NULL, buf[64]; |
|
170 esp_chip_info_t chip_info; |
|
171 |
|
172 esp_chip_info(&chip_info); |
|
173 payload = payload_header(); |
|
174 payload = xstrcat(payload, (char *)"{\"uuid\":\""); |
|
175 payload = xstrcat(payload, config.uuid); |
|
176 payload = xstrcat(payload, (char *)"\","); |
|
177 payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\""); |
|
178 sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores, |
|
179 (chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "", |
|
180 (chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "", |
|
181 chip_info.revision); |
|
182 payload = xstrcat(payload, buf); |
|
183 payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\""); |
|
184 payload = xstrcat(payload, (char *)app_desc->version); |
|
185 payload = xstrcat(payload, (char *)"\"}"); |
|
186 |
|
187 if (xSemaphoreTake(xSemaphoreDS18B20, 10) == pdTRUE) { |
|
188 payload = xstrcat(payload, (char *)",\"THB\":{\"temperature\":"); |
|
189 sprintf(buf, "%.3f", ds18b20_state->bottle_temperature); |
|
190 payload = xstrcat(payload, buf); |
|
191 payload = xstrcat(payload, (char *)"}"); |
|
192 xSemaphoreGive(xSemaphoreDS18B20); |
|
193 } |
|
194 |
|
195 if (xSemaphoreTake(xSemaphoreWiFi, 10) == pdTRUE) { |
|
196 payload = xstrcat(payload, (char *)",\"net\":{\"address\":\""); |
|
197 payload = xstrcat(payload, wifi_state->STA_ip); |
|
198 payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"rssi\":"); |
|
199 sprintf(buf, "%d", wifi_state->STA_rssi); |
|
200 payload = xstrcat(payload, buf); |
|
201 payload = xstrcat(payload, (char *)"}"); |
|
202 xSemaphoreGive(xSemaphoreWiFi); |
|
203 } |
|
204 |
|
205 payload = xstrcat(payload, (char *)"}}"); |
|
206 // Only NBIRTH messages, no NDATA in this project. |
|
207 topic = topic_base((char *)"NBIRTH"); |
|
208 publisher(topic, payload); |
|
209 free(topic); |
|
210 topic = NULL; |
|
211 free(payload); |
|
212 payload = NULL; |
|
213 } |
|
214 |
|
215 |
|
216 |
|
217 static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) |
|
218 { |
|
219 switch (event->event_id) { |
|
220 |
|
221 case MQTT_EVENT_CONNECTED: |
|
222 ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); |
|
223 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
|
224 |
|
225 // topic = topic_base((char *)"NCMD"); |
|
226 // topic = xstrcat(topic, (char *)"/#"); |
|
227 // ESP_LOGI(TAG, "Subscribe %s", topic); |
|
228 // msg_id = esp_mqtt_client_subscribe(client, topic, 0); |
|
229 // ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); |
|
230 // free(topic); |
|
231 |
|
232 // topic = topic_base((char *)"DCMD"); |
|
233 // topic = xstrcat(topic, (char *)"/#"); |
|
234 // ESP_LOGI(TAG, "Subscribe %s", topic); |
|
235 // msg_id = esp_mqtt_client_subscribe(client, topic, 1); |
|
236 // ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); |
|
237 // free(topic); |
|
238 // topic = NULL; |
|
239 break; |
|
240 |
|
241 case MQTT_EVENT_DISCONNECTED: |
|
242 ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); |
|
243 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
|
244 break; |
|
245 |
|
246 case MQTT_EVENT_SUBSCRIBED: |
|
247 ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); |
|
248 break; |
|
249 |
|
250 case MQTT_EVENT_UNSUBSCRIBED: |
|
251 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); |
|
252 break; |
|
253 |
|
254 case MQTT_EVENT_PUBLISHED: |
|
255 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); |
|
256 break; |
|
257 |
|
258 case MQTT_EVENT_DATA: |
|
259 ESP_LOGI(TAG, "MQTT_EVENT_DATA"); |
|
260 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); |
|
261 printf("DATA=%.*s\r\n", event->data_len, event->data); |
|
262 break; |
|
263 |
|
264 case MQTT_EVENT_ERROR: |
|
265 ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); |
|
266 break; |
|
267 |
|
268 case MQTT_EVENT_BEFORE_CONNECT: |
|
269 ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT"); |
|
270 // Configure connection can be here. |
|
271 break; |
|
272 |
|
273 default: |
|
274 ESP_LOGI(TAG, "Other event id:%d", event->event_id); |
|
275 break; |
|
276 } |
|
277 return ESP_OK; |
|
278 } |
|
279 |
|
280 |
|
281 |
|
282 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { |
|
283 ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); |
|
284 mqtt_event_handler_cb(event_data); |
|
285 } |
|
286 |
|
287 |
|
288 |
|
289 |
|
290 /* |
|
291 * Task to read temperature sensors on request. |
|
292 */ |
|
293 void task_mqtt(void *pvParameter) |
|
294 { |
|
295 esp_err_t err; |
|
296 |
|
297 ESP_LOGI(TAG, "Starting MQTT task"); |
|
298 |
|
299 /* |
|
300 * Initialize Sequence counter from NVS |
|
301 */ |
|
302 err = nvs_open("storage", NVS_READWRITE, &seq_handle); |
|
303 if (err != ESP_OK) { |
|
304 ESP_LOGI(TAG, "Error (%s) opening NVS handle", esp_err_to_name(err)); |
|
305 } else { |
|
306 err = nvs_get_u64(seq_handle, "Sequence_cnt", &Sequence); |
|
307 switch (err) { |
|
308 case ESP_OK: |
|
309 ESP_LOGI(TAG, "Sequence counter from NVS = %lld", Sequence); |
|
310 break; |
|
311 |
|
312 case ESP_ERR_NVS_NOT_FOUND: |
|
313 ESP_LOGI(TAG, "Sequence counter not found"); |
|
314 break; |
|
315 |
|
316 default: |
|
317 ESP_LOGI(TAG, "Error (%s) init Sequence", esp_err_to_name(err)); |
|
318 break; |
|
319 } |
|
320 } |
|
321 |
|
322 /* event handler and event group for the wifi driver */ |
|
323 xEventGroupMQTT = xEventGroupCreate(); |
|
324 EventBits_t uxBits; |
|
325 esp_mqtt_client_config_t mqtt_cfg = { |
|
326 .uri = "mqtt://seaport.mbse.ym", |
|
327 }; |
|
328 /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg); |
|
329 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); |
|
330 |
|
331 /* |
|
332 * Task loop forever. |
|
333 */ |
|
334 while (1) { |
|
335 |
|
336 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); |
|
337 |
|
338 if (uxBits & TASK_MQTT_CONNECT) { |
|
339 ESP_LOGI(TAG, "Request MQTT connect"); |
|
340 err = esp_mqtt_client_start(client); |
|
341 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); |
|
342 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); |
|
343 |
|
344 } else if (uxBits & TASK_MQTT_DISCONNECT) { |
|
345 ESP_LOGI(TAG, "Request MQTT disconnect"); |
|
346 // publish disconnect messages |
|
347 err = esp_mqtt_client_stop(client); |
|
348 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); |
|
349 |
|
350 err = nvs_commit(seq_handle); |
|
351 if (err != ESP_OK) |
|
352 ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err)); |
|
353 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); |
|
354 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
|
355 } |
|
356 vTaskDelay( (TickType_t)10); |
|
357 } |
|
358 } |
|
359 |