main/task_mqtt.c

changeset 0
88d965579617
child 1
1082183cd6bb
equal deleted inserted replaced
-1:000000000000 0:88d965579617
1 /**
2 * @file task_mqtt.c
3 * @brief The FreeRTOS task to maintain MQTT connections.
4 */
5
6
7 #include "config.h"
8
9
10 static const char *TAG = "task_mqtt";
11
12 EventGroupHandle_t xEventGroupMQTT; ///< Events MQTT task
13
14 uint64_t Sequence = 0; ///< Sequence stored in NVS
15 nvs_handle_t seq_handle; ///< NVS handle
16
17 esp_mqtt_client_handle_t client; ///< MQTT client handle
18 const int TASK_MQTT_CONNECT = BIT0; ///< Request MQTT connection
19 const int TASK_MQTT_DISCONNECT = BIT1; ///< Request MQTT disconnect
20 const int TASK_MQTT_CONNECTED = BIT2; ///< MQTT is connected
21
22 const char *sensState[] = { "Ok", "Error" };
23 const char *unitMode[] = { "Off", "On" };
24
25 extern DS18B20_State *ds18b20_state; ///< DS18B20 state
26 extern SemaphoreHandle_t xSemaphoreDS18B20; ///< DS18B20 lock semaphore
27 extern ADC_State *adc_state; ///< ADC state
28 extern SemaphoreHandle_t xSemaphoreADC; ///< ADC lock semaphore
29 extern WIFI_State *wifi_state; ///< WiFi state
30 extern SemaphoreHandle_t xSemaphoreWiFi; ///< WiFi lock semaphore
31
32 extern unit_t units[3];
33 extern const esp_app_desc_t *app_desc;
34
35
36
37 void connect_mqtt(bool state)
38 {
39 if (state)
40 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
41 else
42 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
43 }
44
45
46
47 bool ready_mqtt(void)
48 {
49 if (xEventGroupGetBits(xEventGroupMQTT) & TASK_MQTT_CONNECTED)
50 return true;
51 return false;
52 }
53
54
55
56 char *payload_header(void)
57 {
58 char *tmp, buf[128];
59 esp_err_t err;
60
61 tmp = xstrcpy((char *)"{\"seq\":");
62 sprintf(buf, "%lld", Sequence++);
63 err = nvs_set_u64(seq_handle, "Sequence_cnt", Sequence);
64 if (err != ESP_OK)
65 ESP_LOGE(TAG, "Error %s write Sequence to NVS", esp_err_to_name(err));
66 tmp = xstrcat(tmp, buf);
67 tmp = xstrcat(tmp, (char *)",\"metric\":");
68 return tmp;
69 }
70
71
72
73 char *topic_base(char *msgtype)
74 {
75 char *tmp;
76
77 tmp = xstrcpy((char *)"mbv1.0/pressure/");
78 tmp = xstrcat(tmp, msgtype);
79 tmp = xstrcat(tmp, (char *)"/");
80 tmp = xstrcat(tmp, config.hostname);
81 return tmp;
82 }
83
84
85
86 void publisher(char *topic, char *payload)
87 {
88 // publish the data
89 if (payload)
90 esp_mqtt_client_publish(client, topic, payload, strlen(payload), 1, 0);
91 else
92 esp_mqtt_client_publish(client, topic, NULL, 0, 1, 0);
93 }
94
95
96
97 char *unit_data(int i)
98 {
99 char *payload = NULL;
100 char buf[128];
101
102 payload = xstrcpy((char *)"{\"uuid\":\"");
103 payload = xstrcat(payload, units[i].uuid);
104 payload = xstrcat(payload, (char *)"\",\"alias\":\"");
105 payload = xstrcat(payload, units[i].alias);
106
107 // temperature_state temperature_address temperature
108 payload = xstrcat(payload, (char *)"\",\"temperature\":{\"state\":\"");
109 payload = xstrcat(payload, (char *)sensState[units[i].temperature_state]);
110 payload = xstrcat(payload, (char *)"\",\"address\":\"");
111 payload = xstrcat(payload, (char *)units[i].temperature_address);
112 payload = xstrcat(payload, (char *)"\",\"temperature\":");
113 sprintf(buf, "%.3f", units[i].temperature / 1000.0);
114 payload = xstrcat(payload, buf);
115
116 // pressure_state pressure_channel pressure_voltage pressure_zero pressure
117 payload = xstrcat(payload, (char *)"},\"pressure\":{\"state\":\"");
118 payload = xstrcat(payload, (char *)sensState[units[i].pressure_state]);
119 payload = xstrcat(payload, (char *)"\",\"channel\":");
120 sprintf(buf, "%d", units[i].pressure_channel);
121 payload = xstrcat(payload, buf);
122 payload = xstrcat(payload, (char *)",\"voltage\":");
123 sprintf(buf, "%.3f", units[i].pressure_voltage / 1000.0);
124 payload = xstrcat(payload, buf);
125 payload = xstrcat(payload, (char *)",\"zero\":");
126 sprintf(buf, "%.3f", units[i].pressure_zero / 1000.0);
127 payload = xstrcat(payload, buf);
128 payload = xstrcat(payload, (char *)",\"bar\":");
129 sprintf(buf, "%.3f", units[i].pressure / 1000.0);
130 payload = xstrcat(payload, buf);
131 payload = xstrcat(payload, (char *)"},\"mode\":\"");
132 payload = xstrcat(payload, (char *)unitMode[units[i].mode]);
133 payload = xstrcat(payload, (char *)"\"}");
134 return payload;
135 }
136
137
138
139 void publishUnits(void)
140 {
141 char *topic = NULL, *payload = NULL, *payloadu = NULL;
142 int i;
143 bool comma = false;
144
145 payload = payload_header();
146 payload = xstrcat(payload, (char *)"{\"units\":[");
147 for (i = 0; i < 3; i++) {
148 if (comma)
149 payload = xstrcat(payload, (char *)",");
150 payloadu = unit_data(i);
151 payload = xstrcat(payload, payloadu);
152 comma = true;
153 free(payloadu);
154 payloadu = NULL;
155 }
156 payload = xstrcat(payload, (char *)"]}}");
157 topic = topic_base((char *)"DBIRTH");
158 publisher(topic, payload);
159 free(topic);
160 topic = NULL;
161 free(payload);
162 payload = NULL;
163 }
164
165
166
167 void publishNode(void)
168 {
169 char *topic = NULL, *payload = NULL, buf[64];
170 esp_chip_info_t chip_info;
171
172 esp_chip_info(&chip_info);
173 payload = payload_header();
174 payload = xstrcat(payload, (char *)"{\"uuid\":\"");
175 payload = xstrcat(payload, config.uuid);
176 payload = xstrcat(payload, (char *)"\",");
177 payload = xstrcat(payload, (char *)"\"properties\":{\"hardwaremake\":\"Unknown\",\"hardwaremodel\":\"");
178 sprintf(buf, "ESP32 %d CPU WiFi%s%s rev %d", chip_info.cores,
179 (chip_info.features & CHIP_FEATURE_BT) ? "/BT" : "",
180 (chip_info.features & CHIP_FEATURE_BLE) ? "/BLE" : "",
181 chip_info.revision);
182 payload = xstrcat(payload, buf);
183 payload = xstrcat(payload, (char *)"\",\"os\":\"FreeRTOS\",\"os_version\":\"Unknown\",\"FW\":\"");
184 payload = xstrcat(payload, (char *)app_desc->version);
185 payload = xstrcat(payload, (char *)"\"}");
186
187 if (xSemaphoreTake(xSemaphoreDS18B20, 10) == pdTRUE) {
188 payload = xstrcat(payload, (char *)",\"THB\":{\"temperature\":");
189 sprintf(buf, "%.3f", ds18b20_state->bottle_temperature);
190 payload = xstrcat(payload, buf);
191 payload = xstrcat(payload, (char *)"}");
192 xSemaphoreGive(xSemaphoreDS18B20);
193 }
194
195 if (xSemaphoreTake(xSemaphoreWiFi, 10) == pdTRUE) {
196 payload = xstrcat(payload, (char *)",\"net\":{\"address\":\"");
197 payload = xstrcat(payload, wifi_state->STA_ip);
198 payload = xstrcat(payload, (char *)"\",\"ifname\":\"sta\",\"rssi\":");
199 sprintf(buf, "%d", wifi_state->STA_rssi);
200 payload = xstrcat(payload, buf);
201 payload = xstrcat(payload, (char *)"}");
202 xSemaphoreGive(xSemaphoreWiFi);
203 }
204
205 payload = xstrcat(payload, (char *)"}}");
206 // Only NBIRTH messages, no NDATA in this project.
207 topic = topic_base((char *)"NBIRTH");
208 publisher(topic, payload);
209 free(topic);
210 topic = NULL;
211 free(payload);
212 payload = NULL;
213 }
214
215
216
217 static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
218 {
219 switch (event->event_id) {
220
221 case MQTT_EVENT_CONNECTED:
222 ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
223 xEventGroupSetBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
224
225 // topic = topic_base((char *)"NCMD");
226 // topic = xstrcat(topic, (char *)"/#");
227 // ESP_LOGI(TAG, "Subscribe %s", topic);
228 // msg_id = esp_mqtt_client_subscribe(client, topic, 0);
229 // ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
230 // free(topic);
231
232 // topic = topic_base((char *)"DCMD");
233 // topic = xstrcat(topic, (char *)"/#");
234 // ESP_LOGI(TAG, "Subscribe %s", topic);
235 // msg_id = esp_mqtt_client_subscribe(client, topic, 1);
236 // ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
237 // free(topic);
238 // topic = NULL;
239 break;
240
241 case MQTT_EVENT_DISCONNECTED:
242 ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
243 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
244 break;
245
246 case MQTT_EVENT_SUBSCRIBED:
247 ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
248 break;
249
250 case MQTT_EVENT_UNSUBSCRIBED:
251 ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
252 break;
253
254 case MQTT_EVENT_PUBLISHED:
255 ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
256 break;
257
258 case MQTT_EVENT_DATA:
259 ESP_LOGI(TAG, "MQTT_EVENT_DATA");
260 printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
261 printf("DATA=%.*s\r\n", event->data_len, event->data);
262 break;
263
264 case MQTT_EVENT_ERROR:
265 ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
266 break;
267
268 case MQTT_EVENT_BEFORE_CONNECT:
269 ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
270 // Configure connection can be here.
271 break;
272
273 default:
274 ESP_LOGI(TAG, "Other event id:%d", event->event_id);
275 break;
276 }
277 return ESP_OK;
278 }
279
280
281
282 static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
283 ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
284 mqtt_event_handler_cb(event_data);
285 }
286
287
288
289
290 /*
291 * Task to read temperature sensors on request.
292 */
293 void task_mqtt(void *pvParameter)
294 {
295 esp_err_t err;
296
297 ESP_LOGI(TAG, "Starting MQTT task");
298
299 /*
300 * Initialize Sequence counter from NVS
301 */
302 err = nvs_open("storage", NVS_READWRITE, &seq_handle);
303 if (err != ESP_OK) {
304 ESP_LOGI(TAG, "Error (%s) opening NVS handle", esp_err_to_name(err));
305 } else {
306 err = nvs_get_u64(seq_handle, "Sequence_cnt", &Sequence);
307 switch (err) {
308 case ESP_OK:
309 ESP_LOGI(TAG, "Sequence counter from NVS = %lld", Sequence);
310 break;
311
312 case ESP_ERR_NVS_NOT_FOUND:
313 ESP_LOGI(TAG, "Sequence counter not found");
314 break;
315
316 default:
317 ESP_LOGI(TAG, "Error (%s) init Sequence", esp_err_to_name(err));
318 break;
319 }
320 }
321
322 /* event handler and event group for the wifi driver */
323 xEventGroupMQTT = xEventGroupCreate();
324 EventBits_t uxBits;
325 esp_mqtt_client_config_t mqtt_cfg = {
326 .uri = "mqtt://seaport.mbse.ym",
327 };
328 /*esp_mqtt_client_handle_t */ client = esp_mqtt_client_init(&mqtt_cfg);
329 esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
330
331 /*
332 * Task loop forever.
333 */
334 while (1) {
335
336 uxBits = xEventGroupWaitBits(xEventGroupMQTT, TASK_MQTT_CONNECT | TASK_MQTT_DISCONNECT, pdFALSE, pdFALSE, portMAX_DELAY );
337
338 if (uxBits & TASK_MQTT_CONNECT) {
339 ESP_LOGI(TAG, "Request MQTT connect");
340 err = esp_mqtt_client_start(client);
341 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
342 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECT);
343
344 } else if (uxBits & TASK_MQTT_DISCONNECT) {
345 ESP_LOGI(TAG, "Request MQTT disconnect");
346 // publish disconnect messages
347 err = esp_mqtt_client_stop(client);
348 ESP_LOGI(TAG, "Result %s", esp_err_to_name(err));
349
350 err = nvs_commit(seq_handle);
351 if (err != ESP_OK)
352 ESP_LOGE(TAG, "Error %s commit NVS", esp_err_to_name(err));
353 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_DISCONNECT);
354 xEventGroupClearBits(xEventGroupMQTT, TASK_MQTT_CONNECTED);
355 }
356 vTaskDelay( (TickType_t)10);
357 }
358 }
359

mercurial