155 |
157 |
156 |
158 |
157 |
159 |
158 void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) |
160 void my_message_callback(struct mosquitto *my_mosq, void *userdata, const struct mosquitto_message *message) |
159 { |
161 { |
|
162 char *message_type; |
|
163 struct json_object *jobj, *metric, *val; |
|
164 time_t timestamp; |
|
165 int timediff; |
|
166 |
160 if (message->payloadlen) { |
167 if (message->payloadlen) { |
|
168 /* |
|
169 * Process received commands |
|
170 */ |
|
171 strtok(message->topic, "/"); // Ignore mbv1.0 |
|
172 strtok(NULL, "/"); // Ignore group_id |
|
173 message_type = strtok(NULL, "/"); |
|
174 |
|
175 jobj = json_tokener_parse(message->payload); |
|
176 if (json_object_object_get_ex(jobj, "timestamp", &val)) { |
|
177 timestamp = json_object_get_int(val); |
|
178 timediff = (int)timestamp - time(NULL); |
|
179 if ((timediff < 61) && (timediff > -61)) { |
|
180 if (json_object_object_get_ex(jobj, "metric", &metric)) { |
|
181 if ((json_object_object_get_ex(metric, "Node Control/Reboot", &val)) && (strcmp(message_type, "NCMD") == 0)) { |
|
182 if (json_object_get_boolean(val) == true) { |
|
183 syslog(LOG_NOTICE, "MQTT: `Node Control/Reboot' command"); |
|
184 /* |
|
185 * Reboot. The erver process will restart which is handled |
|
186 * in the main thread loop. |
|
187 */ |
|
188 my_reboot = my_shutdown = TRUE; |
|
189 return; |
|
190 } |
|
191 } |
|
192 if ((json_object_object_get_ex(metric, "Node Control/Rebirth", &val)) && (strcmp(message_type, "NCMD") == 0)) { |
|
193 if (json_object_get_boolean(val) == true) { |
|
194 /* |
|
195 * Resend all birth certificates. |
|
196 */ |
|
197 syslog(LOG_NOTICE, "MQTT: `Node Control/Rebirth' command"); |
|
198 publishNData(true, 0); |
|
199 publishDBirthAll(); |
|
200 return; |
|
201 } |
|
202 } |
|
203 printf("metric: %s\n", (char *)json_object_get_string(metric)); |
|
204 syslog(LOG_NOTICE, "MQTT: %s payload not understood\n", (char *)message->payload); |
|
205 return; |
|
206 } |
|
207 } else { |
|
208 syslog(LOG_NOTICE, "MQTT: got payload with timestamp %d seconds error", timediff); |
|
209 return; |
|
210 } |
|
211 } |
|
212 |
161 syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); |
213 syslog(LOG_NOTICE, "MQTT: message callback %s :: %d", message->topic, message->payloadlen); |
162 // TODO: process subscribed topics here. |
|
163 |
|
164 } else { |
214 } else { |
165 syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); |
215 syslog(LOG_NOTICE, "MQTT: message callback %s (null)", message->topic); |
166 } |
216 } |
167 } |
217 } |
168 |
218 |