main/task_mqtt.c

changeset 24
74609f70411e
parent 18
12506716211c
child 25
c5a9bde0268f
equal deleted inserted replaced
23:2cc30d828d6e 24:74609f70411e
77 77
78 78
79 79
80 void wait_mqtt(int time) 80 void wait_mqtt(int time)
81 { 81 {
82 EventBits_t uxBits; 82 // EventBits_t uxBits;
83 83
84 ESP_LOGI(TAG, "wait_mqtt(%d) 1", time);
85 if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) { 84 if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED) {
86 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, time / portTICK_PERIOD_MS); 85 /*uxBits =*/ xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, time / portTICK_PERIOD_MS);
87 ESP_LOGI(TAG, "wait_mqtt(%d) 2 %lu", time, uxBits & TASK_MQTT_DISCONNECTED); 86 // ESP_LOGI(TAG, "wait_mqtt(%d) 2 %lu", time, uxBits & TASK_MQTT_DISCONNECTED);
88 } else { 87 // } else {
89 ESP_LOGI(TAG, "wait_mqtt(%d) 3 not connected", time); 88 // ESP_LOGI(TAG, "wait_mqtt(%d) 3 not connected", time);
90 } 89 }
91 } 90 }
92 91
93 92
94 93
240 239
241 240
242 241
243 static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) 242 static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
244 { 243 {
245 char *topic = NULL; 244 char *subscr = NULL, *check = NULL;
246 245
247 switch (event->event_id) { 246 switch (event->event_id) {
248 247
249 case MQTT_EVENT_CONNECTED: 248 case MQTT_EVENT_CONNECTED:
250 ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); 249 ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
251 topic = topic_base(); 250 subscr = topic_base();
252 topic = xstrcat(topic, (char *)"output/set/#"); 251 subscr = xstrcat(subscr, (char *)"output/set/#");
253 ESP_LOGI(TAG, "Subscribe `%s' id %d", topic, esp_mqtt_client_subscribe(client, topic, 0)); 252 int msgid = esp_mqtt_client_subscribe(client, subscr, 0);
254 free(topic); 253 ESP_LOGI(TAG, "Subscribe `%s' id %d", subscr, msgid);
255 topic = NULL; 254 free(subscr);
255 subscr = NULL;
256 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 256 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
257 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); 257 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
258 break; 258 break;
259 259
260 case MQTT_EVENT_DISCONNECTED: 260 case MQTT_EVENT_DISCONNECTED:
262 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 262 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
263 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); 263 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
264 break; 264 break;
265 265
266 case MQTT_EVENT_SUBSCRIBED: 266 case MQTT_EVENT_SUBSCRIBED:
267 ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); 267 ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
268 break; 268 break;
269 269
270 case MQTT_EVENT_UNSUBSCRIBED: 270 case MQTT_EVENT_UNSUBSCRIBED:
271 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); 271 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
272 break; 272 break;
273 273
274 case MQTT_EVENT_PUBLISHED: 274 case MQTT_EVENT_PUBLISHED:
275 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
276 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) { 275 if (xSemaphoreTake(xSemaphorePcounter, 10) == pdTRUE) {
277 if (count_pub) { 276 if (count_pub) {
278 count_pub--; 277 count_pub--;
279 } 278 }
280 xSemaphoreGive(xSemaphorePcounter); 279 xSemaphoreGive(xSemaphorePcounter);
281 } else { 280 } else {
282 ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event"); 281 ESP_LOGE(TAG, "mqtt_event_handler_cb(() lock error event");
283 } 282 }
283 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d, %d msgs in queue", event->msg_id, count_pub);
284 break; 284 break;
285 285
286 case MQTT_EVENT_DATA: 286 case MQTT_EVENT_DATA:
287 ESP_LOGI(TAG, "MQTT_EVENT_DATA len=%d", event->data_len);
288 bool gotit = false; 287 bool gotit = false;
289 char data[65]; 288 char data[65], topic[128];
290 if (event->data_len < 65) 289 if (event->data_len < 65)
291 snprintf(data, 64, "%.*s", event->data_len, event->data); 290 snprintf(data, 64, "%.*s", event->data_len, event->data);
292 else 291 else
293 data[0] = '\0'; 292 data[0] = '\0';
294 293 if (event->topic_len < 128)
295 topic = topic_base(); 294 snprintf(topic, 127, "%.*s", event->topic_len, event->topic);
296 topic = xstrcat(topic, (char *)"output/set/1"); 295 else
297 if (strncmp(topic, event->topic, event->topic_len) == 0) { 296 topic[0] = '\0';
298 ESP_LOGI(TAG, "Got %s `%s' %d", topic, data, atoi(data)); 297 ESP_LOGI(TAG, "MQTT_EVENT_DATA %s %s", topic, data);
298
299 check = topic_base();
300 check = xstrcat(check, (char *)"output/set/1");
301 if (strncmp(check, event->topic, event->topic_len) == 0) {
302 ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
299 gotit = true; 303 gotit = true;
300 Relay1 = (uint8_t)atoi(data); 304 Relay1 = (uint8_t)atoi(data);
301 nvsio_write_u8((char *)"out1", Relay1); 305 nvsio_write_u8((char *)"out1", Relay1);
302 } 306 }
303 free(topic); 307 free(check);
304 topic = NULL; 308 check = NULL;
305 309
306 topic = topic_base(); 310 check = topic_base();
307 topic = xstrcat(topic, (char *)"output/set/2"); 311 check = xstrcat(check, (char *)"output/set/2");
308 if (strncmp(topic, event->topic, event->topic_len) == 0) { 312 if (strncmp(check, event->topic, event->topic_len) == 0) {
309 ESP_LOGI(TAG, "Got %s `%s' %d", topic, data, atoi(data)); 313 ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
310 gotit = true; 314 gotit = true;
311 Relay2 = (uint8_t)atoi(data); 315 Relay2 = (uint8_t)atoi(data);
312 nvsio_write_u8((char *)"out2", Relay2); 316 nvsio_write_u8((char *)"out2", Relay2);
313 } 317 }
314 free(topic); 318 free(check);
315 topic = NULL; 319 check = NULL;
316 320
317 topic = topic_base(); 321 check = topic_base();
318 topic = xstrcat(topic, (char *)"output/set/3"); 322 check = xstrcat(check, (char *)"output/set/3");
319 if (strncmp(topic, event->topic, event->topic_len) == 0) { 323 if (strncmp(check, event->topic, event->topic_len) == 0) {
320 ESP_LOGI(TAG, "Got %s `%s' %d", topic, data, atoi(data)); 324 ESP_LOGI(TAG, "Got %s `%s' %d", check, data, atoi(data));
321 gotit = true; 325 gotit = true;
322 Dimmer3 = (uint8_t)atoi(data); 326 Dimmer3 = (uint8_t)atoi(data);
323 nvsio_write_u8((char *)"out3", Dimmer3); 327 nvsio_write_u8((char *)"out3", Dimmer3);
324 } 328 }
325 free(topic); 329 free(check);
326 topic = NULL; 330 check = NULL;
327 331
328 topic = topic_base(); 332 check = topic_base();
329 topic = xstrcat(topic, (char *)"output/set/4"); 333 check = xstrcat(check, (char *)"output/set/4");
330 if (strncmp(topic, event->topic, event->topic_len) == 0) { 334 if (strncmp(check, event->topic, event->topic_len) == 0) {
331 ESP_LOGI(TAG, "Got %s `%s' %d", topic, data, atoi(data)); 335 ESP_LOGD(TAG, "Got %s `%s' %d", check, data, atoi(data));
332 gotit = true; 336 gotit = true;
333 Dimmer4 = (uint8_t)atoi(data); 337 Dimmer4 = (uint8_t)atoi(data);
334 nvsio_write_u8((char *)"out4", Dimmer4); 338 nvsio_write_u8((char *)"out4", Dimmer4);
335 } 339 }
336 free(topic); 340 free(check);
337 topic = NULL; 341 check = NULL;
338 342
339 if (! gotit) { 343 if (! gotit) {
340 printf("TOPIC=%.*s ", event->topic_len, event->topic); 344 printf("TOPIC=%.*s ", event->topic_len, event->topic);
341 printf("DATA=%.*s\r\n", event->data_len, event->data); 345 printf("DATA=%.*s\r\n", event->data_len, event->data);
342 } 346 }
425 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); 429 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
426 } 430 }
427 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT); 431 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
428 432
429 } else if (uxBits & TASK_MQTT_DISCONNECT) { 433 } else if (uxBits & TASK_MQTT_DISCONNECT) {
430 ESP_LOGI(TAG, "Request MQTT disconnect start"); 434 ESP_LOGI(TAG, "Request MQTT disconnect");
431 /* 435 /*
432 * Unsubscribe if connected 436 * Unsubscribe if connected
433 */ 437 */
434 if (ready_mqtt()) { 438 if (ready_mqtt()) {
435 char *topic = topic_base(); 439 char *topic = topic_base();
444 err = esp_mqtt_client_disconnect(client); 448 err = esp_mqtt_client_disconnect(client);
445 if (err != ESP_OK) { 449 if (err != ESP_OK) {
446 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); 450 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err));
447 } 451 }
448 xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, 500 / portTICK_PERIOD_MS); 452 xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED, pdTRUE, pdFALSE, 500 / portTICK_PERIOD_MS);
449 ESP_LOGI(TAG, "disconnect confirmed");
450 453
451 /* 454 /*
452 * Finally stop the client because new connections start 455 * Finally stop the client because new connections start
453 * with a 'esp_mqtt_client_start()' command. 456 * with a 'esp_mqtt_client_start()' command.
454 * This will take about 5 seconds, but we don't need the network. 457 * This will take about 5 seconds, but we don't need the network.
455 */ 458 */
456 err = esp_mqtt_client_stop(client); 459 err = esp_mqtt_client_stop(client);
457 if (err != ESP_OK) { 460 if (err != ESP_OK) {
458 ESP_LOGE(TAG, "Result %s", esp_err_to_name(err)); 461 ESP_LOGE(TAG, "esp_mqtt_client_stop() result %s", esp_err_to_name(err));
459 } else {
460 ESP_LOGI(TAG, "stopped");
461 } 462 }
462 463
463 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED); 464 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
464 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED); 465 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECTED);
465 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT); 466 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);

mercurial