8 |
8 |
9 |
9 |
10 static const char *TAG = "task_mqtt"; |
10 static const char *TAG = "task_mqtt"; |
11 |
11 |
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task |
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task |
13 |
13 SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore. |
14 uint64_t Sequence = 0; ///< Sequence stored in NVS |
14 uint64_t Sequence = 0; ///< Sequence stored in NVS |
15 nvs_handle_t seq_handle; ///< NVS handle |
15 nvs_handle_t seq_handle; ///< NVS handle |
16 |
16 int count_pub = 0; ///< Outstanding published messages. |
17 esp_mqtt_client_handle_t client; ///< MQTT client handle |
17 esp_mqtt_client_handle_t client; ///< MQTT client handle |
|
18 |
18 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection |
19 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection |
19 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect |
20 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect |
20 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected |
21 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected |
21 |
22 |
22 const char *sensState[] = { "OK", "ERROR" }; |
23 const char *sensState[] = { "OK", "ERROR" }; |
26 extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore |
27 extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore |
27 extern ADC_State *adc_state; ///< ADC state |
28 extern ADC_State *adc_state; ///< ADC state |
28 extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore |
29 extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore |
29 extern WIFI_State *wifi_state; ///< WiFi state |
30 extern WIFI_State *wifi_state; ///< WiFi state |
30 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore |
31 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore |
31 |
|
32 extern unit_t units[3]; |
32 extern unit_t units[3]; |
33 extern SemaphoreHandle_t xSemaphoreUnits; |
33 extern SemaphoreHandle_t xSemaphoreUnits; |
34 |
|
35 extern const esp_app_desc_t *app_desc; |
34 extern const esp_app_desc_t *app_desc; |
36 |
35 |
37 |
36 |
38 |
37 |
39 void connect_mqtt(bool state) |
38 void connect_mqtt(bool state) |
90 // publish the data |
89 // publish the data |
91 if (payload) |
90 if (payload) |
92 esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); |
91 esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); |
93 else |
92 else |
94 esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); |
93 esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); |
|
94 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { |
|
95 count_pub++; |
|
96 printf(" up %d\n", count_pub); |
|
97 xSemaphoreGive(xSemaphorePcounter); |
|
98 } else { |
|
99 ESP_LOGE(TAG, "Missed lock 1"); |
|
100 } |
95 } |
101 } |
96 |
102 |
97 |
103 |
98 |
104 |
99 char *unit_data(int i) |
105 char *unit_data(int i) |
181 payload = payload_header(); |
187 payload = payload_header(); |
182 payload = xstrcat(payload, (char *)"{\"uuid\":\""); |
188 payload = xstrcat(payload, (char *)"{\"uuid\":\""); |
183 payload = xstrcat(payload, config.uuid); |
189 payload = xstrcat(payload, config.uuid); |
184 payload = xstrcat(payload, (char *)"\","); |
190 payload = xstrcat(payload, (char *)"\","); |
185 payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\""); |
191 payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\""); |
186 sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores, |
192 sprintf(buf, "ESP32 %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision); |
187 (chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "", |
|
188 (chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "", |
|
189 chip_info.revision); |
|
190 payload = xstrcat(payload, buf); |
193 payload = xstrcat(payload, buf); |
191 payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\""); |
194 payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\""); |
192 payload = xstrcat(payload, (char *)app_desc->version); |
195 payload = xstrcat(payload, (char *)app_desc->version); |
193 payload = xstrcat(payload, (char *)"\"}"); |
196 payload = xstrcat(payload, (char *)"\"}"); |
194 |
197 |
277 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); |
280 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); |
278 break; |
281 break; |
279 |
282 |
280 case MQTT_EVENT_PUBLISHED: |
283 case MQTT_EVENT_PUBLISHED: |
281 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); |
284 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); |
|
285 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { |
|
286 if (count_pub) { |
|
287 count_pub--; |
|
288 printf("down %d\n", count_pub); |
|
289 } |
|
290 xSemaphoreGive(xSemaphorePcounter); |
|
291 } else { |
|
292 ESP_LOGE(TAG, "Missed lock 2"); |
|
293 } |
282 break; |
294 break; |
283 |
295 |
284 case MQTT_EVENT_DATA: |
296 case MQTT_EVENT_DATA: |
285 ESP_LOGI(TAG, "MQTT_EVENT_DATA"); |
297 ESP_LOGI(TAG, "MQTT_EVENT_DATA"); |
286 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); |
298 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); |
304 } |
316 } |
305 |
317 |
306 |
318 |
307 |
319 |
308 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { |
320 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { |
309 // ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); |
|
310 mqtt_event_handler_cb(event_data); |
321 mqtt_event_handler_cb(event_data); |
311 } |
322 } |
312 |
|
313 |
323 |
314 |
324 |
315 |
325 |
316 /* |
326 /* |
317 * Task to read temperature sensors on request. |
327 * Task to read temperature sensors on request. |
319 void task_mqtt(void *pvParameter) |
329 void task_mqtt(void *pvParameter) |
320 { |
330 { |
321 esp_err_t err; |
331 esp_err_t err; |
322 |
332 |
323 ESP_LOGI(TAG, "Starting MQTT task"); |
333 ESP_LOGI(TAG, "Starting MQTT task"); |
|
334 xSemaphorePcounter = xSemaphoreCreateMutex(); |
324 |
335 |
325 /* |
336 /* |
326 * Initialize Sequence counter from NVS |
337 * Initialize Sequence counter from NVS |
327 */ |
338 */ |
328 err = nvs_open("storage", NVS_READWRITE, &seq_handle); |
339 err = nvs_open("storage", NVS_READWRITE, &seq_handle); |
349 xEventGroupMQTT = xEventGroupCreate(); |
360 xEventGroupMQTT = xEventGroupCreate(); |
350 EventBits_t uxBits; |
361 EventBits_t uxBits; |
351 esp_mqtt_client_config_t mqtt_cfg = { |
362 esp_mqtt_client_config_t mqtt_cfg = { |
352 .uri = "mqtt://seaport.mbse.ym", |
363 .uri = "mqtt://seaport.mbse.ym", |
353 }; |
364 }; |
354 /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg); |
365 client = esp_mqtt_client_init(&mqtt_cfg); |
355 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); |
366 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); |
356 |
367 |
357 /* |
368 /* |
358 * Task loop forever. |
369 * Task loop forever. |
359 */ |
370 */ |
362 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); |
373 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); |
363 |
374 |
364 if (uxBits & TASK_MQTT_CONNECT) { |
375 if (uxBits & TASK_MQTT_CONNECT) { |
365 ESP_LOGI(TAG, "Request MQTT connect"); |
376 ESP_LOGI(TAG, "Request MQTT connect"); |
366 err = esp_mqtt_client_start(client); |
377 err = esp_mqtt_client_start(client); |
367 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); |
378 if (err != ESP_OK) |
|
379 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); |
368 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); |
380 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); |
369 |
381 |
370 } else if (uxBits & TASK_MQTT_DISCONNECT) { |
382 } else if (uxBits & TASK_MQTT_DISCONNECT) { |
371 ESP_LOGI(TAG, "Request MQTT disconnect"); |
383 ESP_LOGI(TAG, "Request MQTT disconnect"); |
372 // publish disconnect messages |
384 esp_mqtt_client_stop(client); |
373 err = esp_mqtt_client_stop(client); |
|
374 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); |
|
375 |
|
376 err = nvs_commit(seq_handle); |
385 err = nvs_commit(seq_handle); |
377 if (err != ESP_OK) |
386 if (err != ESP_OK) |
378 ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err)); |
387 ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err)); |
379 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); |
388 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); |
380 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
389 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); |
381 } |
390 } |
382 vTaskDelay( (TickType_t)10); |
391 } |
383 } |
392 } |
384 } |
393 |
385 |
|