main/task_mqtt.c

changeset 11
e33f2d325d15
parent 9
a85995941d0d
child 12
7dc9003f86a8
equal deleted inserted replaced
10:d08c7466bb40 11:e33f2d325d15
8 8
9 9
10 static const char *TAG = "task_mqtt"; 10 static const char *TAG = "task_mqtt";
11 11
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task 12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task
13 13 SemaphoreHandle_t xSemaphorePcounter; ///< Publish counter semaphore.
14 uint64_t Sequence = 0; ///< Sequence stored in NVS 14 uint64_t Sequence = 0; ///< Sequence stored in NVS
15 nvs_handle_t seq_handle; ///< NVS handle 15 nvs_handle_t seq_handle; ///< NVS handle
16 16 int count_pub = 0; ///< Outstanding published messages.
17 esp_mqtt_client_handle_t client; ///< MQTT client handle 17 esp_mqtt_client_handle_t client; ///< MQTT client handle
18
18 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection 19 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection
19 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect 20 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect
20 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected 21 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected
21 22
22 const char *sensState[] = { "OK", "ERROR" }; 23 const char *sensState[] = { "OK", "ERROR" };
26 extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore 27 extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore
27 extern ADC_State *adc_state; ///< ADC state 28 extern ADC_State *adc_state; ///< ADC state
28 extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore 29 extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore
29 extern WIFI_State *wifi_state; ///< WiFi state 30 extern WIFI_State *wifi_state; ///< WiFi state
30 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore 31 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore
31
32 extern unit_t units[3]; 32 extern unit_t units[3];
33 extern SemaphoreHandle_t xSemaphoreUnits; 33 extern SemaphoreHandle_t xSemaphoreUnits;
34
35 extern const esp_app_desc_t *app_desc; 34 extern const esp_app_desc_t *app_desc;
36 35
37 36
38 37
39 void connect_mqtt(bool state) 38 void connect_mqtt(bool state)
90 // publish the data 89 // publish the data
91 if (payload) 90 if (payload)
92 esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0); 91 esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
93 else 92 else
94 esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0); 93 esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
94 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
95 count_pub++;
96 printf(" up %d\n", count_pub);
97 xSemaphoreGive(xSemaphorePcounter);
98 } else {
99 ESP_LOGE(TAG, "Missed lock 1");
100 }
95 } 101 }
96 102
97 103
98 104
99 char *unit_data(int i) 105 char *unit_data(int i)
181 payload = payload_header(); 187 payload = payload_header();
182 payload = xstrcat(payload, (char *)"{\"uuid\":\""); 188 payload = xstrcat(payload, (char *)"{\"uuid\":\"");
183 payload = xstrcat(payload, config.uuid); 189 payload = xstrcat(payload, config.uuid);
184 payload = xstrcat(payload, (char *)"\","); 190 payload = xstrcat(payload, (char *)"\",");
185 payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\""); 191 payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"");
186 sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores, 192 sprintf(buf, "ESP32 %d cores rev %d, WiFi bgn", chip_info.cores, chip_info.revision);
187 (chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "",
188 (chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "",
189 chip_info.revision);
190 payload = xstrcat(payload, buf); 193 payload = xstrcat(payload, buf);
191 payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\""); 194 payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\"");
192 payload = xstrcat(payload, (char *)app_desc->version); 195 payload = xstrcat(payload, (char *)app_desc->version);
193 payload = xstrcat(payload, (char *)"\"}"); 196 payload = xstrcat(payload, (char *)"\"}");
194 197
277 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); 280 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
278 break; 281 break;
279 282
280 case MQTT_EVENT_PUBLISHED: 283 case MQTT_EVENT_PUBLISHED:
281 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); 284 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
285 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
286 if (count_pub) {
287 count_pub--;
288 printf("down %d\n", count_pub);
289 }
290 xSemaphoreGive(xSemaphorePcounter);
291 } else {
292 ESP_LOGE(TAG, "Missed lock 2");
293 }
282 break; 294 break;
283 295
284 case MQTT_EVENT_DATA: 296 case MQTT_EVENT_DATA:
285 ESP_LOGI(TAG, "MQTT_EVENT_DATA"); 297 ESP_LOGI(TAG, "MQTT_EVENT_DATA");
286 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); 298 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
304 } 316 }
305 317
306 318
307 319
308 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { 320 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
309 // ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
310 mqtt_event_handler_cb(event_data); 321 mqtt_event_handler_cb(event_data);
311 } 322 }
312
313 323
314 324
315 325
316 /* 326 /*
317 * Task to read temperature sensors on request. 327 * Task to read temperature sensors on request.
319 void task_mqtt(void *pvParameter) 329 void task_mqtt(void *pvParameter)
320 { 330 {
321 esp_err_t err; 331 esp_err_t err;
322 332
323 ESP_LOGI(TAG, "Starting MQTT task"); 333 ESP_LOGI(TAG, "Starting MQTT task");
334 xSemaphorePcounter = xSemaphoreCreateMutex();
324 335
325 /* 336 /*
326 * Initialize Sequence counter from NVS 337 * Initialize Sequence counter from NVS
327 */ 338 */
328 err = nvs_open("storage", NVS_READWRITE, &seq_handle); 339 err = nvs_open("storage", NVS_READWRITE, &seq_handle);
349 xEventGroupMQTT = xEventGroupCreate(); 360 xEventGroupMQTT = xEventGroupCreate();
350 EventBits_t uxBits; 361 EventBits_t uxBits;
351 esp_mqtt_client_config_t mqtt_cfg = { 362 esp_mqtt_client_config_t mqtt_cfg = {
352 .uri = "mqtt://seaport.mbse.ym", 363 .uri = "mqtt://seaport.mbse.ym",
353 }; 364 };
354 /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg); 365 client = esp_mqtt_client_init(&mqtt_cfg);
355 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client); 366 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
356 367
357 /* 368 /*
358 * Task loop forever. 369 * Task loop forever.
359 */ 370 */
362 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY ); 373 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );
363 374
364 if (uxBits & TASK_MQTT_CONNECT) { 375 if (uxBits & TASK_MQTT_CONNECT) {
365 ESP_LOGI(TAG, "Request MQTT connect"); 376 ESP_LOGI(TAG, "Request MQTT connect");
366 err = esp_mqtt_client_start(client); 377 err = esp_mqtt_client_start(client);
367 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err)); 378 if (err != ESP_OK)
379 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
368 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); 380 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
369 381
370 } else if (uxBits & TASK_MQTT_DISCONNECT) { 382 } else if (uxBits & TASK_MQTT_DISCONNECT) {
371 ESP_LOGI(TAG, "Request MQTT disconnect"); 383 ESP_LOGI(TAG, "Request MQTT disconnect");
372 // publish disconnect messages 384 esp_mqtt_client_stop(client);
373 err = esp_mqtt_client_stop(client);
374 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
375
376 err = nvs_commit(seq_handle); 385 err = nvs_commit(seq_handle);
377 if (err != ESP_OK) 386 if (err != ESP_OK)
378 ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err)); 387 ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err));
379 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); 388 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
380 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 389 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
381 } 390 }
382 vTaskDelay( (TickType_t)10); 391 }
383 } 392 }
384 } 393
385

mercurial