thermferm/mqtt.c

changeset 550
04c942cded91
parent 549
ee1bcad035f0
child 554
ab9f22ab57b5
equal deleted inserted replaced
549:ee1bcad035f0 550:04c942cded91
24 #include "xutil.h" 24 #include "xutil.h"
25 #include "mqtt.h" 25 #include "mqtt.h"
26 26
27 extern sys_config Config; 27 extern sys_config Config;
28 extern int debug; 28 extern int debug;
29 extern int my_shutdown;
30 extern int my_reboot;
29 extern const char UNITMODE[5][8]; 31 extern const char UNITMODE[5][8];
30 extern const char PROFSTATE[5][6]; 32 extern const char PROFSTATE[5][6];
31 extern const char TEMPSTATE[3][8]; 33 extern const char TEMPSTATE[3][8];
32 34
33 int Sequence = 0; 35 int Sequence = 0;
155 157
156 158
157 159
158 void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) 160 void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message)
159 { 161 {
162 char *message_type;
163 struct json_object *jobj, *metric, *val;
164 time_t timestamp;
165 int timediff;
166
160 if (message->payloadlen) { 167 if (message->payloadlen) {
168 /*
169 * Process received commands
170 */
171 strtok(message->topic, "/"); // Ignore mbv1.0
172 strtok(NULL, "/"); // Ignore group_id
173 message_type = strtok(NULL, "/");
174
175 jobj = json_tokener_parse(message->payload);
176 if (json_object_object_get_ex(jobj, "timestamp", &val)) {
177 timestamp = json_object_get_int(val);
178 timediff = (int)timestamp - time(NULL);
179 if ((timediff < 61) && (timediff > -61)) {
180 if (json_object_object_get_ex(jobj, "metric", &metric)) {
181 if ((json_object_object_get_ex(metric, "Node Control/Reboot", &val)) && (strcmp(message_type, "NCMD") == 0)) {
182 if (json_object_get_boolean(val) == true) {
183 syslog(LOG_NOTICE, "MQTT: `Node Control/Reboot' command");
184 /*
185 * Reboot. The erver process will restart which is handled
186 * in the main thread loop.
187 */
188 my_reboot = my_shutdown = TRUE;
189 return;
190 }
191 }
192 if ((json_object_object_get_ex(metric, "Node Control/Rebirth", &val)) && (strcmp(message_type, "NCMD") == 0)) {
193 if (json_object_get_boolean(val) == true) {
194 /*
195 * Resend all birth certificates.
196 */
197 syslog(LOG_NOTICE, "MQTT: `Node Control/Rebirth' command");
198 publishNData(true, 0);
199 publishDBirthAll();
200 return;
201 }
202 }
203 printf("metric: %s\n", (char *)json_object_get_string(metric));
204 syslog(LOG_NOTICE, "MQTT: %s payload not understood\n", (char *)message->payload);
205 return;
206 }
207 } else {
208 syslog(LOG_NOTICE, "MQTT: got payload with timestamp %d seconds error", timediff);
209 return;
210 }
211 }
212
161 syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); 213 syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen);
162 // TODO: process subscribed topics here.
163
164 } else { 214 } else {
165 syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); 215 syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic);
166 } 216 }
167 } 217 }
168 218

mercurial